Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "routing/epidemic/EpidemicRoutingExtension.h"
00009 #include "routing/epidemic/EpidemicControlMessage.h"
00010
00011 #include "routing/QueueBundleEvent.h"
00012 #include "net/TransferCompletedEvent.h"
00013 #include "net/TransferAbortedEvent.h"
00014 #include "core/BundleExpiredEvent.h"
00015 #include "core/NodeEvent.h"
00016 #include "core/TimeEvent.h"
00017 #include "core/Node.h"
00018 #include "net/ConnectionManager.h"
00019 #include "Configuration.h"
00020 #include "core/BundleCore.h"
00021 #include "core/BundleGeneratedEvent.h"
00022
00023 #include <ibrdtn/data/MetaBundle.h>
00024 #include <ibrcommon/thread/MutexLock.h>
00025 #include <ibrcommon/Logger.h>
00026 #include <ibrcommon/AutoDelete.h>
00027
00028 #include <functional>
00029 #include <list>
00030 #include <algorithm>
00031
00032 #include <iomanip>
00033 #include <ios>
00034 #include <iostream>
00035 #include <set>
00036
00037 #include <stdlib.h>
00038 #include <typeinfo>
00039
00040 namespace dtn
00041 {
00042 namespace routing
00043 {
00044 EpidemicRoutingExtension::EpidemicRoutingExtension()
00045 {
00046
00047 IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module" << IBRCOMMON_LOGGER_ENDL;
00048 }
00049
00050 EpidemicRoutingExtension::~EpidemicRoutingExtension()
00051 {
00052 stop();
00053 join();
00054 }
00055
00056 void EpidemicRoutingExtension::stopExtension()
00057 {
00058 _taskqueue.abort();
00059 }
00060
00061 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt)
00062 {
00063
00064 try {
00065 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00066 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) );
00067 return;
00068 } catch (const std::bad_cast&) { };
00069
00070
00071 try {
00072 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00073
00074 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00075 {
00076
00077 _taskqueue.push( new ExpireTask(time.getTimestamp()) );
00078 }
00079 return;
00080 } catch (const std::bad_cast&) { };
00081
00082
00083
00084 try {
00085 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00086 const dtn::core::Node &n = nodeevent.getNode();
00087
00088 if (nodeevent.getAction() == NODE_AVAILABLE)
00089 {
00090 NeighborDatabase &db = (**this).getNeighborDB();
00091
00092 try {
00093
00094 ibrcommon::MutexLock l(db);
00095 NeighborDatabase::NeighborEntry &entry = db.get(n.getEID());
00096
00097
00098 entry.acquireFilterRequest();
00099
00100
00101 _taskqueue.push( new QuerySummaryVectorTask( n.getEID() ) );
00102 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00103 } catch (const NeighborDatabase::NeighborNotAvailableException&) { };
00104 }
00105
00106 return;
00107 } catch (const std::bad_cast&) { };
00108
00109
00110 try {
00111 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00112
00113
00114 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) );
00115
00116 return;
00117 } catch (const std::bad_cast&) { };
00118
00119
00120 try {
00121 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00122
00123
00124 _taskqueue.push( new TransferCompletedTask( completed.getPeer(), completed.getBundle() ) );
00125 return;
00126 } catch (const std::bad_cast&) { };
00127 }
00128
00129 bool EpidemicRoutingExtension::__cancellation()
00130 {
00131 _taskqueue.abort();
00132 return true;
00133 }
00134
00135 void EpidemicRoutingExtension::run()
00136 {
00137 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00138 {
00139 public:
00140 BundleFilter(const NeighborDatabase::NeighborEntry &entry)
00141 : _entry(entry)
00142 {};
00143
00144 virtual ~BundleFilter() {};
00145
00146 virtual size_t limit() const { return 10; };
00147
00148 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00149 {
00150
00151
00152 if (meta.source == (dtn::core::BundleCore::local + "/routing/epidemic"))
00153 {
00154 return false;
00155 }
00156
00157
00158 const dtn::data::EID dest = meta.destination.getNodeEID();
00159 if (_blacklist.find(dest) != _blacklist.end())
00160 {
00161 return false;
00162 }
00163
00164
00165
00166 if (_entry.has(meta, true))
00167 {
00168 return false;
00169 }
00170
00171 return true;
00172 };
00173
00174 void blacklist(const dtn::data::EID& id)
00175 {
00176 _blacklist.insert(id);
00177 };
00178
00179 private:
00180 std::set<dtn::data::EID> _blacklist;
00181 const NeighborDatabase::NeighborEntry &_entry;
00182 };
00183
00184 dtn::core::BundleStorage &storage = (**this).getStorage();
00185
00186 while (true)
00187 {
00188 try {
00189 Task *t = _taskqueue.getnpop(true);
00190 ibrcommon::AutoDelete<Task> killer(t);
00191
00192 IBRCOMMON_LOGGER_DEBUG(50) << "processing epidemic task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00193
00194 try {
00198 try {
00199 ExecutableTask &etask = dynamic_cast<ExecutableTask&>(*t);
00200 etask.execute();
00201 } catch (const std::bad_cast&) { };
00202
00206 try {
00207 ExpireTask &task = dynamic_cast<ExpireTask&>(*t);
00208 _purge_vector.expire(task.timestamp);
00209 } catch (const std::bad_cast&) { };
00210
00216 try {
00217 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00218 NeighborDatabase &db = (**this).getNeighborDB();
00219
00220 ibrcommon::MutexLock l(db);
00221 NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
00222
00223 try {
00224
00225 BundleFilter filter(entry);
00226
00227
00228 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00229
00230
00231 filter.blacklist(task.eid);
00232
00233
00234 const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00235
00236
00237 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00238 {
00239 try {
00240
00241 transferTo(entry, *iter);
00242 } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00243 }
00244 } catch (const NeighborDatabase::BloomfilterNotAvailableException&) {
00245
00246 entry.acquireFilterRequest();
00247
00248
00249 _taskqueue.push( new QuerySummaryVectorTask( task.eid ) );
00250 }
00251 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00252 } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00253 } catch (const std::bad_cast&) { };
00254
00258 try {
00259 TransferCompletedTask &task = dynamic_cast<TransferCompletedTask&>(*t);
00260
00261
00262 if (( EID(task.peer.getNodeEID()) == EID(task.meta.destination.getNodeEID()) ) && (task.meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00263 {
00264 IBRCOMMON_LOGGER_DEBUG(15) << "singleton bundle delivered: " << task.meta.toString() << IBRCOMMON_LOGGER_ENDL;
00265
00266
00267 _purge_vector.add(task.meta);
00268 }
00269
00270
00271 _taskqueue.push( new SearchNextBundleTask( task.peer ) );
00272 } catch (const std::bad_cast&) { };
00273
00277 try {
00278 ProcessBundleTask &task = dynamic_cast<ProcessBundleTask&>(*t);
00279
00280
00281 if ( task.bundle.destination == (dtn::core::BundleCore::local + "/routing/epidemic") )
00282 {
00283
00284 dtn::data::Bundle bundle = storage.get(task.bundle);
00285
00286
00287 processECM(task.origin, bundle);
00288
00289
00290 storage.remove(bundle);
00291 }
00292 else
00293 {
00294
00295 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00296
00297 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00298 {
00299 const dtn::core::Node &n = (*iter);
00300
00301
00302 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00303 }
00304 }
00305 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00306
00307 } catch (const std::bad_cast&) { };
00308 } catch (const ibrcommon::Exception &ex) {
00309 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in EpidemicRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00310 }
00311 } catch (const std::exception&) {
00312 return;
00313 }
00314
00315 yield();
00316 }
00317 }
00318
00319
00320
00321 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00322 : eid(e)
00323 { }
00324
00325 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00326 { }
00327
00328 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
00329 {
00330 return "SearchNextBundleTask: " + eid.getString();
00331 }
00332
00333
00334
00335 EpidemicRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o)
00336 : bundle(meta), origin(o)
00337 { }
00338
00339 EpidemicRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00340 { }
00341
00342 std::string EpidemicRoutingExtension::ProcessBundleTask::toString()
00343 {
00344 return "ProcessBundleTask: " + bundle.toString();
00345 }
00346
00347
00348
00349 EpidemicRoutingExtension::ExpireTask::ExpireTask(const size_t t)
00350 : timestamp(t)
00351 { }
00352
00353 EpidemicRoutingExtension::ExpireTask::~ExpireTask()
00354 { }
00355
00356 std::string EpidemicRoutingExtension::ExpireTask::toString()
00357 {
00358 return "ExpireTask";
00359 }
00360
00361
00362
00363 EpidemicRoutingExtension::TransferCompletedTask::TransferCompletedTask(const dtn::data::EID &e, const dtn::data::MetaBundle &m)
00364 : peer(e), meta(m)
00365 { }
00366
00367 EpidemicRoutingExtension::TransferCompletedTask::~TransferCompletedTask()
00368 { }
00369
00370 std::string EpidemicRoutingExtension::TransferCompletedTask::toString()
00371 {
00372 return "TransferCompletedTask";
00373 }
00374
00375
00376
00377 EpidemicRoutingExtension::QuerySummaryVectorTask::QuerySummaryVectorTask(const dtn::data::EID &o)
00378 : origin(o)
00379 { }
00380
00381 EpidemicRoutingExtension::QuerySummaryVectorTask::~QuerySummaryVectorTask()
00382 { }
00383
00384 void EpidemicRoutingExtension::QuerySummaryVectorTask::execute() const
00385 {
00386
00387 EpidemicControlMessage ecm;
00388
00389
00390 ecm.type = EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR;
00391
00392
00393 dtn::data::Bundle req;
00394
00395
00396 req._source = dtn::core::BundleCore::local + "/routing/epidemic";
00397
00398
00399 req.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00400 req._destination = origin + "/routing/epidemic";
00401
00402
00403 req._lifetime = 60;
00404
00405
00406 req.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, true);
00407 req.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00408
00409 dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>();
00410 ibrcommon::BLOB::Reference ref = p.getBLOB();
00411
00412
00413 {
00414 ibrcommon::BLOB::iostream ios = ref.iostream();
00415 (*ios) << ecm;
00416 }
00417
00418
00419 dtn::core::BundleGeneratedEvent::raise(req);
00420 }
00421
00422 std::string EpidemicRoutingExtension::QuerySummaryVectorTask::toString()
00423 {
00424 return "QuerySummaryVectorTask: " + origin.getString();
00425 }
00426
00427 void EpidemicRoutingExtension::processECM(const dtn::data::EID &origin, const dtn::data::Bundle &bundle)
00428 {
00429
00430 const dtn::data::PayloadBlock &p = bundle.getBlock<dtn::data::PayloadBlock>();
00431 ibrcommon::BLOB::Reference ref = p.getBLOB();
00432 EpidemicControlMessage ecm;
00433
00434
00435 {
00436 ibrcommon::BLOB::iostream s = ref.iostream();
00437 (*s) >> ecm;
00438 }
00439
00440
00441 if (ecm.type == EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR)
00442 {
00443
00444 EpidemicControlMessage response_ecm;
00445
00446
00447 response_ecm.type = EpidemicControlMessage::ECM_RESPONSE;
00448
00449
00450 const SummaryVector vec = (**this).getSummaryVector();
00451 response_ecm.setSummaryVector(vec);
00452
00453
00454 response_ecm.setPurgeVector(_purge_vector);
00455
00456
00457 dtn::data::Bundle answer;
00458
00459
00460 answer._source = dtn::core::BundleCore::local + "/routing/epidemic";
00461
00462
00463 answer.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00464 answer._destination = bundle._source;
00465
00466
00467 answer._lifetime = 60;
00468
00469
00470 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, true);
00471 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00472
00473 dtn::data::PayloadBlock &p = answer.push_back<PayloadBlock>();
00474 ibrcommon::BLOB::Reference ref = p.getBLOB();
00475
00476
00477 {
00478 ibrcommon::BLOB::iostream ios = ref.iostream();
00479 (*ios) << response_ecm;
00480 }
00481
00482
00483 dtn::core::BundleGeneratedEvent::raise(answer);
00484 }
00485 else if (ecm.type == EpidemicControlMessage::ECM_RESPONSE)
00486 {
00487 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_SUMMARY_VECTOR)
00488 {
00489
00490 const ibrcommon::BloomFilter &filter = ecm.getSummaryVector().getBloomFilter();
00491
00497 try {
00498 NeighborDatabase &db = (**this).getNeighborDB();
00499 ibrcommon::MutexLock l(db);
00500 NeighborDatabase::NeighborEntry &entry = db.get(bundle._source.getNodeEID());
00501 entry.update(filter, bundle._lifetime);
00502
00503
00504 _taskqueue.push( new SearchNextBundleTask( origin ) );
00505 } catch (const NeighborDatabase::NeighborNotAvailableException&) { };
00506 }
00507
00508 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_PURGE_VECTOR)
00509 {
00510
00511 const ibrcommon::BloomFilter &purge = ecm.getPurgeVector().getBloomFilter();
00512
00513 dtn::core::BundleStorage &storage = (**this).getStorage();
00514
00515 try {
00516 while (true)
00517 {
00518
00519 const dtn::data::MetaBundle meta = storage.remove(purge);
00520
00521 IBRCOMMON_LOGGER_DEBUG(15) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00522
00523
00524 _purge_vector.add(meta);
00525 }
00526 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00527 }
00528 }
00529 }
00530 }
00531 }