Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "routing/EpidemicRoutingExtension.h"
00009 #include "routing/QueueBundleEvent.h"
00010 #include "net/TransferCompletedEvent.h"
00011 #include "net/TransferAbortedEvent.h"
00012 #include "core/BundleExpiredEvent.h"
00013 #include "core/NodeEvent.h"
00014 #include "core/TimeEvent.h"
00015 #include "core/Node.h"
00016 #include "net/ConnectionManager.h"
00017 #include "Configuration.h"
00018 #include "core/BundleCore.h"
00019 #include "core/SimpleBundleStorage.h"
00020
00021 #include <ibrdtn/data/MetaBundle.h>
00022 #include <ibrcommon/thread/MutexLock.h>
00023 #include <ibrcommon/Logger.h>
00024 #include <ibrcommon/AutoDelete.h>
00025
00026 #include <functional>
00027 #include <list>
00028 #include <algorithm>
00029
00030 #include <iomanip>
00031 #include <ios>
00032 #include <iostream>
00033
00034 #include <stdlib.h>
00035 #include <typeinfo>
00036
00037 namespace dtn
00038 {
00039 namespace routing
00040 {
00041 struct FindNode: public std::binary_function< dtn::core::Node, dtn::core::Node, bool > {
00042 bool operator () ( const dtn::core::Node &n1, const dtn::core::Node &n2 ) const {
00043 return n1 == n2;
00044 }
00045 };
00046
00047 const dtn::data::EID EpidemicRoutingExtension::EPIDEMIC_ROUTING_ADDRESS("dtn:epidemic-routing");
00048
00049 EpidemicRoutingExtension::EpidemicRoutingExtension()
00050 {
00051
00052 dtn::daemon::Configuration &conf = dtn::daemon::Configuration::getInstance();
00053
00054
00055 IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module for node " << conf.getNodename() << IBRCOMMON_LOGGER_ENDL;
00056
00057
00058 getRouter()->getStorage().addToFilter(EPIDEMIC_ROUTING_ADDRESS);
00059
00060
00061
00062 _epidemic_bundle._lifetime = 10;
00063 _epidemic_bundle._source = dtn::core::BundleCore::local;
00064 _epidemic_bundle._destination = EPIDEMIC_ROUTING_ADDRESS;
00065
00066
00067 _epidemic_bundle.push_back<EpidemicExtensionBlock>();
00068 }
00069
00070 EpidemicRoutingExtension::~EpidemicRoutingExtension()
00071 {
00072 stop();
00073 join();
00074 }
00075
00076 void EpidemicRoutingExtension::stopExtension()
00077 {
00078 _taskqueue.abort();
00079 }
00080
00081 void EpidemicRoutingExtension::update(std::string &name, std::string &data)
00082 {
00083 name = "epidemic";
00084 data = "version=1";
00085 }
00086
00087 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt)
00088 {
00089 try {
00090 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00091 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) );
00092 return;
00093 } catch (std::bad_cast ex) { };
00094
00095 try {
00096 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00097
00098 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00099 {
00100
00101 _taskqueue.push( new ExpireTask(time.getTimestamp()) );
00102 }
00103 return;
00104 } catch (std::bad_cast ex) { };
00105
00106 try {
00107 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00108
00109 dtn::core::Node n = nodeevent.getNode();
00110 dtn::data::EID eid(n.getURI());
00111
00112 switch (nodeevent.getAction())
00113 {
00114 case NODE_AVAILABLE:
00115 {
00116 ibrcommon::MutexLock l(_list_mutex);
00117 _neighbors.setAvailable(eid);
00118 }
00119 _taskqueue.push( new TransferSummaryVectorTask( eid ) );
00120 _taskqueue.push( new SearchNextBundleTask( eid ) );
00121 break;
00122
00123 case NODE_UNAVAILABLE:
00124 {
00125 ibrcommon::MutexLock l(_list_mutex);
00126 _neighbors.setUnavailable(eid);
00127 }
00128 break;
00129
00130 default:
00131 break;
00132 }
00133 return;
00134 } catch (std::bad_cast ex) { };
00135
00136
00137 try {
00138 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00139 dtn::data::EID eid = aborted.getPeer();
00140 dtn::data::BundleID id = aborted.getBundleID();
00141
00142
00143 ibrcommon::MutexLock l(_list_mutex);
00144 NeighborDatabase::NeighborEntry &entry = _neighbors.get(eid);
00145 entry.releaseTransfer();
00146
00147 switch (aborted.reason)
00148 {
00149 case dtn::net::TransferAbortedEvent::REASON_UNDEFINED:
00150 case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN:
00151 break;
00152
00153 case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED:
00154 case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED:
00155 {
00156
00157 _taskqueue.push( new SearchNextBundleTask( eid ) );
00158 break;
00159 }
00160
00161
00162 case dtn::net::TransferAbortedEvent::REASON_REFUSED:
00163 {
00164
00165 ibrcommon::BloomFilter &bf = entry._filter;
00166 bf.insert(id.toString());
00167
00168 if (IBRCOMMON_LOGGER_LEVEL >= 40)
00169 {
00170 IBRCOMMON_LOGGER_DEBUG(40) << "bloomfilter false-positive propability is " << bf.getAllocation() << IBRCOMMON_LOGGER_ENDL;
00171 }
00172
00173
00174 _taskqueue.push( new SearchNextBundleTask( eid ) );
00175 break;
00176 }
00177 }
00178 return;
00179 } catch (std::bad_cast ex) { };
00180
00181 try {
00182 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00183
00184 dtn::data::EID eid = completed.getPeer();
00185 dtn::data::MetaBundle meta = completed.getBundle();
00186
00187
00188 if ( EID(eid.getNodeEID()) == EID(meta.destination.getNodeEID()) )
00189 {
00190 try {
00191
00192
00193 getRouter()->getStorage().remove(meta);
00194
00195 IBRCOMMON_LOGGER_DEBUG(15) << "bundle delivered: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00196
00197
00198 _purge_vector.add(meta);
00199 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00200
00201 }
00202
00203
00204 ibrcommon::MutexLock l(_list_mutex);
00205 NeighborDatabase::NeighborEntry &entry = _neighbors.get(eid);
00206
00207
00208 entry.releaseTransfer();
00209 }
00210 else
00211 {
00212
00213 ibrcommon::MutexLock l(_list_mutex);
00214 NeighborDatabase::NeighborEntry &entry = _neighbors.get(eid);
00215
00216
00217 entry.releaseTransfer();
00218
00219 ibrcommon::BloomFilter &bf = entry._filter;
00220 bf.insert(meta.toString());
00221
00222 if (IBRCOMMON_LOGGER_LEVEL >= 40)
00223 {
00224 IBRCOMMON_LOGGER_DEBUG(40) << "bloomfilter false-positive propability is " << bf.getAllocation() << IBRCOMMON_LOGGER_ENDL;
00225 }
00226 }
00227
00228
00229 _taskqueue.push( new SearchNextBundleTask( eid ) );
00230
00231 return;
00232 } catch (std::bad_cast ex) { };
00233 }
00234
00235 void EpidemicRoutingExtension::prepareEpidemicInfo(dtn::data::Bundle &b)
00236 {
00237
00238 try {
00239 getRouter()->getStorage().remove(b);
00240 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00241
00242
00243 b.relabel();
00244
00245
00246 getRouter()->setKnown(b);
00247
00248
00249 ibrcommon::MutexLock l(_list_mutex);
00250
00251
00252 EpidemicExtensionBlock &eblock = b.getBlock<EpidemicExtensionBlock>();
00253
00254
00255 const SummaryVector vec = getRouter()->getSummaryVector();
00256
00257
00258 eblock.setSummaryVector(vec);
00259 eblock.setPurgeVector(_purge_vector);
00260
00261
00262 getRouter()->getStorage().store(b);
00263 }
00264
00265 bool EpidemicRoutingExtension::__cancellation()
00266 {
00267 _taskqueue.abort();
00268 return true;
00269 }
00270
00271 void EpidemicRoutingExtension::run()
00272 {
00273 dtn::routing::BaseRouter &router = (*getRouter());
00274 dtn::core::BundleStorage &storage = router.getStorage();
00275
00276 bool sendVectorUpdate = false;
00277
00278 while (true)
00279 {
00280 try {
00281 Task *t = _taskqueue.getnpop(true);
00282 ibrcommon::AutoDelete<Task> killer(t);
00283
00284 IBRCOMMON_LOGGER_DEBUG(50) << "processing epidemic task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00285
00286 try {
00291 try {
00292 ExpireTask &task = dynamic_cast<ExpireTask&>(*t);
00293
00294
00295 if (sendVectorUpdate)
00296 {
00297
00298 sendVectorUpdate = false;
00299
00300
00301 prepareEpidemicInfo(_epidemic_bundle);
00302
00303 ibrcommon::MutexLock l(_list_mutex);
00304 std::set<dtn::data::EID> list = _neighbors.getAvailable();
00305
00306 for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00307 {
00308 _taskqueue.push( new TransferSummaryVectorTask( *iter ) );
00309 }
00310 }
00311
00312 else if ((task.timestamp % 5) == 0)
00313 {
00314 prepareEpidemicInfo(_epidemic_bundle);
00315 }
00316
00317 ibrcommon::MutexLock l(_list_mutex);
00318 _purge_vector.expire(task.timestamp);
00319 } catch (std::bad_cast) { };
00320
00326 try {
00327 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00328
00329
00330 ibrcommon::MutexLock l(_list_mutex);
00331 NeighborDatabase::NeighborEntry &entry = _neighbors.get(task.eid);
00332
00333 try {
00334
00335 entry.acquireTransfer();
00336
00337
00338 IBRCOMMON_LOGGER_DEBUG(40) << "search one bundle not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00339
00340 if (entry._lastupdate > 0)
00341 {
00342 try {
00343
00344
00345 ibrcommon::BloomFilter &bf = entry._filter;
00346 const dtn::data::BundleID b = storage.getByFilter(bf);
00347 router.transferTo(task.eid, b);
00348 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00349
00350
00351 const dtn::data::BundleID b = storage.getByDestination(task.eid, false);
00352 router.transferTo(task.eid, b);
00353 }
00354 }
00355 else
00356 {
00357
00358
00359 const dtn::data::BundleID b = storage.getByDestination(task.eid, false);
00360 router.transferTo(task.eid, b);
00361 }
00362 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00363 entry.releaseTransfer();
00364 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { };
00365 } catch (std::bad_cast) { };
00366
00370 try {
00371 TransferSummaryVectorTask &task = dynamic_cast<TransferSummaryVectorTask&>(*t);
00372
00373
00374 router.transferTo(task.eid, _epidemic_bundle);
00375 } catch (std::bad_cast) { };
00376
00380 try {
00381 ProcessBundleTask &task = dynamic_cast<ProcessBundleTask&>(*t);
00382
00383
00384 if (task.bundle.destination == EPIDEMIC_ROUTING_ADDRESS)
00385 {
00386
00387 dtn::data::Bundle bundle = storage.get(task.bundle);
00388
00389
00390 const EpidemicExtensionBlock &ext = bundle.getBlock<EpidemicExtensionBlock>();
00391
00392
00393 const ibrcommon::BloomFilter &filter = ext.getSummaryVector().getBloomFilter();
00394 const ibrcommon::BloomFilter &purge = ext.getPurgeVector().getBloomFilter();
00395
00401 {
00402 ibrcommon::MutexLock l(_list_mutex);
00403 _neighbors.updateBundles(bundle._source, filter);
00404 }
00405
00406
00407 _taskqueue.push( new SearchNextBundleTask( task.origin ) );
00408
00409 while (true)
00410 {
00411
00412 dtn::data::MetaBundle meta = storage.remove(purge);
00413
00414 IBRCOMMON_LOGGER_DEBUG(15) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00415
00416
00417 _purge_vector.add(meta);
00418 }
00419
00420
00421 storage.remove(bundle);
00422 }
00423 else
00424 {
00425 ibrcommon::MutexLock l(_list_mutex);
00426
00427
00428 sendVectorUpdate = true;
00429
00430
00431 std::set<dtn::data::EID> list = _neighbors.getAvailable();
00432
00433 for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00434 {
00435
00436 _taskqueue.push( new SearchNextBundleTask( *iter ) );
00437 }
00438 }
00439 } catch (dtn::data::Bundle::NoSuchBlockFoundException) {
00440
00441 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00442
00443 } catch (std::bad_cast) { };
00444 } catch (ibrcommon::Exception ex) {
00445 IBRCOMMON_LOGGER(error) << "Exception occurred in EpidemicRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00446 }
00447 } catch (std::exception) {
00448 return;
00449 }
00450
00451 yield();
00452 }
00453 }
00454
00455 dtn::data::Block* EpidemicRoutingExtension::EpidemicExtensionBlock::Factory::create()
00456 {
00457 return new EpidemicRoutingExtension::EpidemicExtensionBlock();
00458 }
00459
00460 EpidemicRoutingExtension::EpidemicExtensionBlock::EpidemicExtensionBlock()
00461 : dtn::data::Block(EpidemicExtensionBlock::BLOCK_TYPE), _data("forwarded through epidemic routing")
00462 {
00463 }
00464
00465 EpidemicRoutingExtension::EpidemicExtensionBlock::~EpidemicExtensionBlock()
00466 {
00467 }
00468
00469 void EpidemicRoutingExtension::EpidemicExtensionBlock::setSummaryVector(const SummaryVector &vector)
00470 {
00471 _vector = vector;
00472 }
00473
00474 const SummaryVector& EpidemicRoutingExtension::EpidemicExtensionBlock::getSummaryVector() const
00475 {
00476 return _vector;
00477 }
00478
00479 void EpidemicRoutingExtension::EpidemicExtensionBlock::setPurgeVector(const SummaryVector &vector)
00480 {
00481 _purge = vector;
00482 }
00483
00484 const SummaryVector& EpidemicRoutingExtension::EpidemicExtensionBlock::getPurgeVector() const
00485 {
00486 return _purge;
00487 }
00488
00489 void EpidemicRoutingExtension::EpidemicExtensionBlock::set(dtn::data::SDNV value)
00490 {
00491 _counter = value;
00492 }
00493
00494 dtn::data::SDNV EpidemicRoutingExtension::EpidemicExtensionBlock::get() const
00495 {
00496 return _counter;
00497 }
00498
00499 size_t EpidemicRoutingExtension::EpidemicExtensionBlock::getLength() const
00500 {
00501 return _counter.getLength() + _data.getLength() + _vector.getLength();
00502 }
00503
00504 std::istream& EpidemicRoutingExtension::EpidemicExtensionBlock::deserialize(std::istream &stream)
00505 {
00506 stream >> _counter;
00507 stream >> _data;
00508 stream >> _vector;
00509 stream >> _purge;
00510
00511
00512 dtn::data::Block::set(dtn::data::Block::FORWARDED_WITHOUT_PROCESSED, false);
00513
00514 return stream;
00515 }
00516
00517 std::ostream &EpidemicRoutingExtension::EpidemicExtensionBlock::serialize(std::ostream &stream) const
00518 {
00519 stream << _counter;
00520 stream << _data;
00521 stream << _vector;
00522 stream << _purge;
00523
00524 return stream;
00525 }
00526
00527
00528
00529 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00530 : eid(e)
00531 { }
00532
00533 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00534 { }
00535
00536 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
00537 {
00538 return "SearchNextBundleTask: " + eid.getString();
00539 }
00540
00541
00542
00543 EpidemicRoutingExtension::TransferSummaryVectorTask::TransferSummaryVectorTask(const dtn::data::EID &e)
00544 : eid(e)
00545 { }
00546
00547 EpidemicRoutingExtension::TransferSummaryVectorTask::~TransferSummaryVectorTask()
00548 { }
00549
00550 std::string EpidemicRoutingExtension::TransferSummaryVectorTask::toString()
00551 {
00552 return "TransferSummaryVectorTask: " + eid.getString();
00553 }
00554
00555
00556
00557 EpidemicRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o)
00558 : bundle(meta), origin(o)
00559 { }
00560
00561 EpidemicRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00562 { }
00563
00564 std::string EpidemicRoutingExtension::ProcessBundleTask::toString()
00565 {
00566 return "ProcessBundleTask: " + bundle.toString();
00567 }
00568
00569
00570
00571 EpidemicRoutingExtension::ExpireTask::ExpireTask(const size_t t)
00572 : timestamp(t)
00573 { }
00574
00575 EpidemicRoutingExtension::ExpireTask::~ExpireTask()
00576 { }
00577
00578 std::string EpidemicRoutingExtension::ExpireTask::toString()
00579 {
00580 return "ExpireTask";
00581 }
00582 }
00583 }