|
IBR-DTNSuite 0.6
|
00001 /* 00002 * EpidemicRoutingExtension.cpp 00003 * 00004 * Created on: 18.02.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "routing/epidemic/EpidemicRoutingExtension.h" 00009 #include "routing/epidemic/EpidemicControlMessage.h" 00010 00011 #include "routing/QueueBundleEvent.h" 00012 #include "net/TransferCompletedEvent.h" 00013 #include "net/TransferAbortedEvent.h" 00014 #include "core/BundleExpiredEvent.h" 00015 #include "core/NodeEvent.h" 00016 #include "core/TimeEvent.h" 00017 #include "core/Node.h" 00018 #include "net/ConnectionManager.h" 00019 #include "net/ConnectionEvent.h" 00020 #include "Configuration.h" 00021 #include "core/BundleCore.h" 00022 #include "core/BundleEvent.h" 00023 00024 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00025 #include <ibrdtn/data/MetaBundle.h> 00026 #include <ibrcommon/thread/MutexLock.h> 00027 #include <ibrcommon/Logger.h> 00028 #include <ibrcommon/AutoDelete.h> 00029 00030 #include <functional> 00031 #include <list> 00032 #include <algorithm> 00033 00034 #include <iomanip> 00035 #include <ios> 00036 #include <iostream> 00037 #include <set> 00038 00039 #include <stdlib.h> 00040 #include <typeinfo> 00041 00042 namespace dtn 00043 { 00044 namespace routing 00045 { 00046 EpidemicRoutingExtension::EpidemicRoutingExtension() 00047 : _endpoint(_taskqueue, _purge_vector) 00048 { 00049 // write something to the syslog 00050 IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module" << IBRCOMMON_LOGGER_ENDL; 00051 } 00052 00053 EpidemicRoutingExtension::~EpidemicRoutingExtension() 00054 { 00055 stop(); 00056 join(); 00057 } 00058 00059 void EpidemicRoutingExtension::stopExtension() 00060 { 00061 _taskqueue.abort(); 00062 } 00063 00064 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt) 00065 { 00066 // If an incoming bundle is received, forward it to all connected neighbors 00067 try { 00068 dynamic_cast<const QueueBundleEvent&>(*evt); 00069 00070 // new bundles trigger a recheck for all neighbors 00071 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors(); 00072 00073 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++) 00074 { 00075 const dtn::core::Node &n = (*iter); 00076 00077 // transfer the next bundle to this destination 00078 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00079 } 00080 return; 00081 } catch (const std::bad_cast&) { }; 00082 00083 // On each time event look for expired stuff 00084 try { 00085 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt); 00086 00087 if (time.getAction() == dtn::core::TIME_SECOND_TICK) 00088 { 00089 // check all lists for expired entries 00090 _taskqueue.push( new ExpireTask(time.getTimestamp()) ); 00091 } 00092 return; 00093 } catch (const std::bad_cast&) { }; 00094 00095 // If a new neighbor comes available, send him a request for the summary vector 00096 // If a neighbor went away we can free the stored summary vector 00097 try { 00098 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00099 const dtn::core::Node &n = nodeevent.getNode(); 00100 00101 if (nodeevent.getAction() == NODE_AVAILABLE) 00102 { 00103 // query a new summary vector from this neighbor 00104 _taskqueue.push( new QuerySummaryVectorTask( (**this).getNeighborDB(), n.getEID(), _endpoint ) ); 00105 } 00106 00107 return; 00108 } catch (const std::bad_cast&) { }; 00109 00110 try { 00111 const dtn::net::ConnectionEvent &ce = dynamic_cast<const dtn::net::ConnectionEvent&>(*evt); 00112 00113 if (ce.state == dtn::net::ConnectionEvent::CONNECTION_UP) 00114 { 00115 // query a new summary vector from this neighbor 00116 _taskqueue.push( new QuerySummaryVectorTask( (**this).getNeighborDB(), ce.peer, _endpoint ) ); 00117 } 00118 return; 00119 } catch (const std::bad_cast&) { }; 00120 00121 // The bundle transfer has been aborted 00122 try { 00123 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt); 00124 00125 // transfer the next bundle to this destination 00126 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) ); 00127 00128 return; 00129 } catch (const std::bad_cast&) { }; 00130 00131 // A bundle transfer was successful 00132 try { 00133 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt); 00134 00135 // create a transfer completed task 00136 _taskqueue.push( new TransferCompletedTask( completed.getPeer(), completed.getBundle() ) ); 00137 return; 00138 } catch (const std::bad_cast&) { }; 00139 } 00140 00141 bool EpidemicRoutingExtension::__cancellation() 00142 { 00143 _taskqueue.abort(); 00144 return true; 00145 } 00146 00147 void EpidemicRoutingExtension::run() 00148 { 00149 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback 00150 { 00151 public: 00152 BundleFilter(const NeighborDatabase::NeighborEntry &entry) 00153 : _entry(entry) 00154 {}; 00155 00156 virtual ~BundleFilter() {}; 00157 00158 virtual size_t limit() const { return 10; }; 00159 00160 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00161 { 00162 // check Scope Control Block - do not forward bundles with hop limit == 0 00163 if (meta.hopcount == 0) 00164 { 00165 return false; 00166 } 00167 00168 // check Scope Control Block - do not forward non-group bundles with hop limit <= 1 00169 if ((meta.hopcount <= 1) && (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON))) 00170 { 00171 return false; 00172 } 00173 00174 // do not forward any epidemic control message 00175 // this is done by the neighbor routing module 00176 if (meta.source == (dtn::core::BundleCore::local + "/routing/epidemic")) 00177 { 00178 return false; 00179 } 00180 00181 // do not forward to any blacklisted destination 00182 const dtn::data::EID dest = meta.destination.getNode(); 00183 if (_blacklist.find(dest) != _blacklist.end()) 00184 { 00185 return false; 00186 } 00187 00188 // do not forward bundles already known by the destination 00189 // throws BloomfilterNotAvailableException if no filter is available or it is expired 00190 if (_entry.has(meta, true)) 00191 { 00192 return false; 00193 } 00194 00195 return true; 00196 }; 00197 00198 void blacklist(const dtn::data::EID& id) 00199 { 00200 _blacklist.insert(id); 00201 }; 00202 00203 private: 00204 std::set<dtn::data::EID> _blacklist; 00205 const NeighborDatabase::NeighborEntry &_entry; 00206 }; 00207 00208 dtn::core::BundleStorage &storage = (**this).getStorage(); 00209 00210 while (true) 00211 { 00212 try { 00213 Task *t = _taskqueue.getnpop(true); 00214 ibrcommon::AutoDelete<Task> killer(t); 00215 00216 IBRCOMMON_LOGGER_DEBUG(50) << "processing epidemic task " << t->toString() << IBRCOMMON_LOGGER_ENDL; 00217 00218 try { 00222 try { 00223 ExecutableTask &etask = dynamic_cast<ExecutableTask&>(*t); 00224 etask.execute(); 00225 } catch (const std::bad_cast&) { }; 00226 00230 try { 00231 ExpireTask &task = dynamic_cast<ExpireTask&>(*t); 00232 _purge_vector.expire(task.timestamp); 00233 } catch (const std::bad_cast&) { }; 00234 00240 try { 00241 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t); 00242 NeighborDatabase &db = (**this).getNeighborDB(); 00243 00244 ibrcommon::MutexLock l(db); 00245 NeighborDatabase::NeighborEntry &entry = db.get(task.eid); 00246 00247 try { 00248 // get the bundle filter of the neighbor 00249 BundleFilter filter(entry); 00250 00251 // some debug output 00252 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL; 00253 00254 // blacklist the neighbor itself, because this is handled by neighbor routing extension 00255 filter.blacklist(task.eid); 00256 00257 // query some unknown bundle from the storage, the list contains max. 10 items. 00258 const std::list<dtn::data::MetaBundle> list = storage.get(filter); 00259 00260 // send the bundles as long as we have resources 00261 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00262 { 00263 try { 00264 // transfer the bundle to the neighbor 00265 transferTo(entry, *iter); 00266 } catch (const NeighborDatabase::AlreadyInTransitException&) { }; 00267 } 00268 } catch (const NeighborDatabase::BloomfilterNotAvailableException&) { 00269 // query a new summary vector from this neighbor 00270 _taskqueue.push( new QuerySummaryVectorTask( (**this).getNeighborDB(), task.eid, _endpoint ) ); 00271 } 00272 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { 00273 } catch (const NeighborDatabase::NeighborNotAvailableException&) { 00274 } catch (const std::bad_cast&) { }; 00275 00279 try { 00280 TransferCompletedTask &task = dynamic_cast<TransferCompletedTask&>(*t); 00281 00282 try { 00283 // add this bundle to the purge vector if it is delivered to its destination 00284 if (( task.peer.getNode() == task.meta.destination.getNode() ) && (task.meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON)) 00285 { 00286 IBRCOMMON_LOGGER(notice) << "singleton bundle added to purge vector: " << task.meta.toString() << IBRCOMMON_LOGGER_ENDL; 00287 00288 // add it to the purge vector 00289 _purge_vector.add(task.meta); 00290 } 00291 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }; 00292 00293 // transfer the next bundle to this destination 00294 _taskqueue.push( new SearchNextBundleTask( task.peer ) ); 00295 } catch (const std::bad_cast&) { }; 00296 00300 try { 00301 const ProcessEpidemicBundleTask &task = dynamic_cast<ProcessEpidemicBundleTask&>(*t); 00302 processECM(task.bundle); 00303 } catch (const std::bad_cast&) { }; 00304 00305 } catch (const ibrcommon::Exception &ex) { 00306 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in EpidemicRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00307 } 00308 } catch (const std::exception&) { 00309 return; 00310 } 00311 00312 yield(); 00313 } 00314 } 00315 00316 /****************************************/ 00317 00318 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e) 00319 : eid(e) 00320 { } 00321 00322 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask() 00323 { } 00324 00325 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString() 00326 { 00327 return "SearchNextBundleTask: " + eid.getString(); 00328 } 00329 00330 /****************************************/ 00331 00332 EpidemicRoutingExtension::ProcessEpidemicBundleTask::ProcessEpidemicBundleTask(const dtn::data::Bundle &b) 00333 : bundle(b) 00334 { } 00335 00336 EpidemicRoutingExtension::ProcessEpidemicBundleTask::~ProcessEpidemicBundleTask() 00337 { } 00338 00339 std::string EpidemicRoutingExtension::ProcessEpidemicBundleTask::toString() 00340 { 00341 return "ProcessEpidemicBundleTask: " + bundle.toString(); 00342 } 00343 00344 /****************************************/ 00345 00346 EpidemicRoutingExtension::ExpireTask::ExpireTask(const size_t t) 00347 : timestamp(t) 00348 { } 00349 00350 EpidemicRoutingExtension::ExpireTask::~ExpireTask() 00351 { } 00352 00353 std::string EpidemicRoutingExtension::ExpireTask::toString() 00354 { 00355 return "ExpireTask"; 00356 } 00357 00358 /****************************************/ 00359 00360 EpidemicRoutingExtension::TransferCompletedTask::TransferCompletedTask(const dtn::data::EID &e, const dtn::data::MetaBundle &m) 00361 : peer(e), meta(m) 00362 { } 00363 00364 EpidemicRoutingExtension::TransferCompletedTask::~TransferCompletedTask() 00365 { } 00366 00367 std::string EpidemicRoutingExtension::TransferCompletedTask::toString() 00368 { 00369 return "TransferCompletedTask"; 00370 } 00371 00372 /****************************************/ 00373 00374 EpidemicRoutingExtension::QuerySummaryVectorTask::QuerySummaryVectorTask(NeighborDatabase &db, const dtn::data::EID &o, EpidemicEndpoint &e) 00375 : ndb(db), origin(o), endpoint(e) 00376 { } 00377 00378 EpidemicRoutingExtension::QuerySummaryVectorTask::~QuerySummaryVectorTask() 00379 { } 00380 00381 void EpidemicRoutingExtension::QuerySummaryVectorTask::execute() const 00382 { 00383 try { 00384 // lock the list of neighbors 00385 ibrcommon::MutexLock l(ndb); 00386 NeighborDatabase::NeighborEntry &entry = ndb.get(origin); 00387 00388 // acquire resources to send a summary vector request 00389 entry.acquireFilterRequest(); 00390 00391 // call the query method at the epidemic endpoint instance 00392 endpoint.query(origin); 00393 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { 00394 } catch (const NeighborDatabase::NeighborNotAvailableException&) { }; 00395 } 00396 00397 std::string EpidemicRoutingExtension::QuerySummaryVectorTask::toString() 00398 { 00399 return "QuerySummaryVectorTask: " + origin.getString(); 00400 } 00401 00402 EpidemicRoutingExtension::EpidemicEndpoint::EpidemicEndpoint(ibrcommon::Queue<EpidemicRoutingExtension::Task* > &queue, dtn::routing::BundleSummary &purge) 00403 : _taskqueue(queue), _purge_vector(purge) 00404 { 00405 AbstractWorker::initialize("/routing/epidemic", true); 00406 } 00407 00408 EpidemicRoutingExtension::EpidemicEndpoint::~EpidemicEndpoint() 00409 { 00410 } 00411 00412 void EpidemicRoutingExtension::EpidemicEndpoint::callbackBundleReceived(const Bundle &b) 00413 { 00414 _taskqueue.push( new ProcessEpidemicBundleTask(b) ); 00415 } 00416 00417 void EpidemicRoutingExtension::EpidemicEndpoint::send(const dtn::data::Bundle &b) 00418 { 00419 transmit(b); 00420 } 00421 00422 void EpidemicRoutingExtension::EpidemicEndpoint::query(const dtn::data::EID &origin) 00423 { 00424 // create a new request for the summary vector of the neighbor 00425 EpidemicControlMessage ecm; 00426 00427 // set message type 00428 ecm.type = EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR; 00429 00430 // create a new bundle 00431 dtn::data::Bundle req; 00432 00433 // set the source of the bundle 00434 req._source = dtn::core::BundleCore::local + "/routing/epidemic"; 00435 00436 // set the destination of the bundle 00437 req.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true); 00438 req._destination = origin + "/routing/epidemic"; 00439 00440 // limit the lifetime to 60 seconds 00441 req._lifetime = 60; 00442 00443 // set high priority 00444 req.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false); 00445 req.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true); 00446 00447 dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>(); 00448 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00449 00450 // serialize the request into the payload 00451 { 00452 ibrcommon::BLOB::iostream ios = ref.iostream(); 00453 (*ios) << ecm; 00454 } 00455 00456 // add a schl block 00457 dtn::data::ScopeControlHopLimitBlock &schl = req.push_front<dtn::data::ScopeControlHopLimitBlock>(); 00458 schl.setLimit(1); 00459 00460 // send the bundle 00461 transmit(req); 00462 } 00463 00464 void EpidemicRoutingExtension::processECM(const dtn::data::Bundle &bundle) 00465 { 00466 // read the ecm 00467 const dtn::data::PayloadBlock &p = bundle.getBlock<dtn::data::PayloadBlock>(); 00468 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00469 EpidemicControlMessage ecm; 00470 00471 // locked within this region 00472 { 00473 ibrcommon::BLOB::iostream s = ref.iostream(); 00474 (*s) >> ecm; 00475 } 00476 00477 // if this is a request answer with an summary vector 00478 if (ecm.type == EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR) 00479 { 00480 // create a new request for the summary vector of the neighbor 00481 EpidemicControlMessage response_ecm; 00482 00483 // set message type 00484 response_ecm.type = EpidemicControlMessage::ECM_RESPONSE; 00485 00486 // add own summary vector to the message 00487 const SummaryVector vec = (**this).getSummaryVector(); 00488 response_ecm.setSummaryVector(vec); 00489 00490 // add own purge vector to the message 00491 response_ecm.setPurgeVector(_purge_vector); 00492 00493 // create a new bundle 00494 dtn::data::Bundle answer; 00495 00496 // set the source of the bundle 00497 answer._source = dtn::core::BundleCore::local + "/routing/epidemic"; 00498 00499 // set the destination of the bundle 00500 answer.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true); 00501 answer._destination = bundle._source; 00502 00503 // limit the lifetime to 60 seconds 00504 answer._lifetime = 60; 00505 00506 // set high priority 00507 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false); 00508 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true); 00509 00510 dtn::data::PayloadBlock &p = answer.push_back<PayloadBlock>(); 00511 ibrcommon::BLOB::Reference ref = p.getBLOB(); 00512 00513 // serialize the request into the payload 00514 { 00515 ibrcommon::BLOB::iostream ios = ref.iostream(); 00516 (*ios) << response_ecm; 00517 } 00518 00519 // add a schl block 00520 dtn::data::ScopeControlHopLimitBlock &schl = answer.push_front<dtn::data::ScopeControlHopLimitBlock>(); 00521 schl.setLimit(1); 00522 00523 // transfer the bundle to the neighbor 00524 _endpoint.send(answer); 00525 } 00526 else if (ecm.type == EpidemicControlMessage::ECM_RESPONSE) 00527 { 00528 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_SUMMARY_VECTOR) 00529 { 00530 // get the summary vector (bloomfilter) of this ECM 00531 const ibrcommon::BloomFilter &filter = ecm.getSummaryVector().getBloomFilter(); 00532 00538 try { 00539 NeighborDatabase &db = (**this).getNeighborDB(); 00540 ibrcommon::MutexLock l(db); 00541 NeighborDatabase::NeighborEntry &entry = db.get(bundle._source.getNode()); 00542 entry.update(filter, bundle._lifetime); 00543 00544 // trigger the search-for-next-bundle procedure 00545 _taskqueue.push( new SearchNextBundleTask( entry.eid ) ); 00546 } catch (const NeighborDatabase::NeighborNotAvailableException&) { }; 00547 } 00548 00549 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_PURGE_VECTOR) 00550 { 00551 // get the purge vector (bloomfilter) of this ECM 00552 const ibrcommon::BloomFilter &purge = ecm.getPurgeVector().getBloomFilter(); 00553 00554 dtn::core::BundleStorage &storage = (**this).getStorage(); 00555 00556 try { 00557 while (true) 00558 { 00559 // delete bundles in the purge vector 00560 const dtn::data::MetaBundle meta = storage.remove(purge); 00561 00562 // log the purged bundle 00563 IBRCOMMON_LOGGER(notice) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00564 00565 // gen a report 00566 dtn::core::BundleEvent::raise(meta, BUNDLE_DELETED, StatusReportBlock::DEPLETED_STORAGE); 00567 00568 // add this bundle to the own purge vector 00569 _purge_vector.add(meta); 00570 } 00571 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }; 00572 } 00573 } 00574 } 00575 } 00576 }