• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/routing/EpidemicRoutingExtension.cpp

Go to the documentation of this file.
00001 /*
00002  * EpidemicRoutingExtension.cpp
00003  *
00004  *  Created on: 18.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "routing/EpidemicRoutingExtension.h"
00009 #include "routing/QueueBundleEvent.h"
00010 #include "net/TransferCompletedEvent.h"
00011 #include "net/TransferAbortedEvent.h"
00012 #include "core/BundleExpiredEvent.h"
00013 #include "core/NodeEvent.h"
00014 #include "core/TimeEvent.h"
00015 #include "core/Node.h"
00016 #include "net/ConnectionManager.h"
00017 #include "Configuration.h"
00018 #include "core/BundleCore.h"
00019 #include "core/SimpleBundleStorage.h"
00020 
00021 #include <ibrdtn/data/MetaBundle.h>
00022 #include <ibrcommon/thread/MutexLock.h>
00023 #include <ibrcommon/Logger.h>
00024 #include <ibrcommon/AutoDelete.h>
00025 
00026 #include <functional>
00027 #include <list>
00028 #include <algorithm>
00029 
00030 #include <iomanip>
00031 #include <ios>
00032 #include <iostream>
00033 
00034 #include <stdlib.h>
00035 #include <typeinfo>
00036 
00037 namespace dtn
00038 {
00039         namespace routing
00040         {
00041                 struct FindNode: public std::binary_function< dtn::core::Node, dtn::core::Node, bool > {
00042                         bool operator () ( const dtn::core::Node &n1, const dtn::core::Node &n2 ) const {
00043                                 return n1 == n2;
00044                         }
00045                 };
00046 
00047                 const dtn::data::EID EpidemicRoutingExtension::EPIDEMIC_ROUTING_ADDRESS("dtn:epidemic-routing");
00048 
00049                 EpidemicRoutingExtension::EpidemicRoutingExtension()
00050                 {
00051                         // get configuration
00052                         dtn::daemon::Configuration &conf = dtn::daemon::Configuration::getInstance();
00053 
00054                         // write something to the syslog
00055                         IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module for node " << conf.getNodename() << IBRCOMMON_LOGGER_ENDL;
00056 
00057                         // add epidemic bundles to the storage destination filter
00058                         getRouter()->getStorage().addToFilter(EPIDEMIC_ROUTING_ADDRESS);
00059 
00060                         // build the base epidemic info bundle
00061                         // set basics (lifetime, source, destination)
00062                         _epidemic_bundle._lifetime = 10;
00063                         _epidemic_bundle._source = dtn::core::BundleCore::local;
00064                         _epidemic_bundle._destination = EPIDEMIC_ROUTING_ADDRESS;
00065 
00066                         // create an epidemic extension block
00067                         _epidemic_bundle.push_back<EpidemicExtensionBlock>();
00068                 }
00069 
00070                 EpidemicRoutingExtension::~EpidemicRoutingExtension()
00071                 {
00072                         stop();
00073                         join();
00074                 }
00075 
00076                 void EpidemicRoutingExtension::stopExtension()
00077                 {
00078                         _taskqueue.abort();
00079                 }
00080 
00081                 void EpidemicRoutingExtension::update(std::string &name, std::string &data)
00082                 {
00083                         name = "epidemic";
00084                         data = "version=1";
00085                 }
00086 
00087                 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt)
00088                 {
00089                         try {
00090                                 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00091                                 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) );
00092                                 return;
00093                         } catch (std::bad_cast ex) { };
00094 
00095                         try {
00096                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00097 
00098                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00099                                 {
00100                                         // check all lists for expired entries
00101                                         _taskqueue.push( new ExpireTask(time.getTimestamp()) );
00102                                 }
00103                                 return;
00104                         } catch (std::bad_cast ex) { };
00105 
00106                         try {
00107                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00108 
00109                                 dtn::core::Node n = nodeevent.getNode();
00110                                 dtn::data::EID eid(n.getURI());
00111 
00112                                 switch (nodeevent.getAction())
00113                                 {
00114                                         case NODE_AVAILABLE:
00115                                         {
00116                                                 ibrcommon::MutexLock l(_list_mutex);
00117                                                 _neighbors.setAvailable(eid);
00118                                         }
00119                                         _taskqueue.push( new TransferSummaryVectorTask( eid ) );
00120                                         _taskqueue.push( new SearchNextBundleTask( eid ) );
00121                                         break;
00122 
00123                                         case NODE_UNAVAILABLE:
00124                                         {
00125                                                 ibrcommon::MutexLock l(_list_mutex);
00126                                                 _neighbors.setUnavailable(eid);
00127                                         }
00128                                         break;
00129 
00130                                         default:
00131                                                 break;
00132                                 }
00133                                 return;
00134                         } catch (std::bad_cast ex) { };
00135 
00136                         // TransferAbortedEvent: send next bundle
00137                         try {
00138                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00139                                 dtn::data::EID eid = aborted.getPeer();
00140                                 dtn::data::BundleID id = aborted.getBundleID();
00141 
00142                                 // lock the list of neighbors
00143                                 ibrcommon::MutexLock l(_list_mutex);
00144                                 NeighborDatabase::NeighborEntry &entry = _neighbors.get(eid);
00145                                 entry.releaseTransfer();
00146 
00147                                 switch (aborted.reason)
00148                                 {
00149                                         case dtn::net::TransferAbortedEvent::REASON_UNDEFINED:
00150                                         case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN:
00151                                                 break;
00152 
00153                                         case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED:
00154                                         case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED:
00155                                         {
00156                                                 // transfer the next bundle to this destination
00157                                                 _taskqueue.push( new SearchNextBundleTask( eid ) );
00158                                                 break;
00159                                         }
00160 
00161                                         // How to exclude several bundles? Combine two bloomfilters? Use a blocklist. Delay transfers to the node?
00162                                         case dtn::net::TransferAbortedEvent::REASON_REFUSED:
00163                                         {
00164                                                 // add the transferred bundle to the bloomfilter of the receiver
00165                                                 ibrcommon::BloomFilter &bf = entry._filter;
00166                                                 bf.insert(id.toString());
00167 
00168                                                 if (IBRCOMMON_LOGGER_LEVEL >= 40)
00169                                                 {
00170                                                         IBRCOMMON_LOGGER_DEBUG(40) << "bloomfilter false-positive propability is " << bf.getAllocation() << IBRCOMMON_LOGGER_ENDL;
00171                                                 }
00172 
00173                                                 // transfer the next bundle to this destination
00174                                                 _taskqueue.push( new SearchNextBundleTask( eid ) );
00175                                                 break;
00176                                         }
00177                                 }
00178                                 return;
00179                         } catch (std::bad_cast ex) { };
00180 
00181                         try {
00182                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00183 
00184                                 dtn::data::EID eid = completed.getPeer();
00185                                 dtn::data::MetaBundle meta = completed.getBundle();
00186 
00187                                 // delete the bundle in the storage if
00188                                 if ( EID(eid.getNodeEID()) == EID(meta.destination.getNodeEID()) )
00189                                 {
00190                                         try {
00191                                                 // bundle has been delivered to its destination
00192                                                 // delete it from our storage
00193                                                 getRouter()->getStorage().remove(meta);
00194 
00195                                                 IBRCOMMON_LOGGER_DEBUG(15) << "bundle delivered: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00196 
00197                                                 // add it to the purge vector
00198                                                 _purge_vector.add(meta);
00199                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00200 
00201                                         }
00202 
00203                                         // lock the list of bloom filters
00204                                         ibrcommon::MutexLock l(_list_mutex);
00205                                         NeighborDatabase::NeighborEntry &entry = _neighbors.get(eid);
00206 
00207                                         // release resources for transmission
00208                                         entry.releaseTransfer();
00209                                 }
00210                                 else
00211                                 {
00212                                         // add the transferred bundle to the bloomfilter of the receiver
00213                                         ibrcommon::MutexLock l(_list_mutex);
00214                                         NeighborDatabase::NeighborEntry &entry = _neighbors.get(eid);
00215 
00216                                         // release resources for transmission
00217                                         entry.releaseTransfer();
00218 
00219                                         ibrcommon::BloomFilter &bf = entry._filter;
00220                                         bf.insert(meta.toString());
00221 
00222                                         if (IBRCOMMON_LOGGER_LEVEL >= 40)
00223                                         {
00224                                                 IBRCOMMON_LOGGER_DEBUG(40) << "bloomfilter false-positive propability is " << bf.getAllocation() << IBRCOMMON_LOGGER_ENDL;
00225                                         }
00226                                 }
00227 
00228                                 // transfer the next bundle to this destination
00229                                 _taskqueue.push( new SearchNextBundleTask( eid ) );
00230 
00231                                 return;
00232                         } catch (std::bad_cast ex) { };
00233                 }
00234 
00235                 void EpidemicRoutingExtension::prepareEpidemicInfo(dtn::data::Bundle &b)
00236                 {
00237                         // delete the old epidemic bundle
00238                         try {
00239                                 getRouter()->getStorage().remove(b);
00240                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00241 
00242                         // relabel the bundle with a new timestamp and sequence number
00243                         b.relabel();
00244 
00245                         // add the epidemic bundle to the list of known bundles
00246                         getRouter()->setKnown(b);
00247 
00248                         // lock the lists
00249                         ibrcommon::MutexLock l(_list_mutex);
00250 
00251                         // create an epidemic extension block
00252                         EpidemicExtensionBlock &eblock = b.getBlock<EpidemicExtensionBlock>();
00253 
00254                         // get the global summary vector of this daemon
00255                         const SummaryVector vec = getRouter()->getSummaryVector();
00256 
00257                         // set the summary and purge vector
00258                         eblock.setSummaryVector(vec);
00259                         eblock.setPurgeVector(_purge_vector);
00260 
00261                         // store the bundle in the storage
00262                         getRouter()->getStorage().store(b);
00263                 }
00264 
00265                 bool EpidemicRoutingExtension::__cancellation()
00266                 {
00267                         _taskqueue.abort();
00268                         return true;
00269                 }
00270 
00271                 void EpidemicRoutingExtension::run()
00272                 {
00273                         dtn::routing::BaseRouter &router = (*getRouter());
00274                         dtn::core::BundleStorage &storage = router.getStorage();
00275 
00276                         bool sendVectorUpdate = false;
00277 
00278                         while (true)
00279                         {
00280                                 try {
00281                                         Task *t = _taskqueue.getnpop(true);
00282                                         ibrcommon::AutoDelete<Task> killer(t);
00283 
00284                                         IBRCOMMON_LOGGER_DEBUG(50) << "processing epidemic task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00285 
00286                                         try {
00291                                                 try {
00292                                                         ExpireTask &task = dynamic_cast<ExpireTask&>(*t);
00293 
00294                                                         // should i send a vector update?
00295                                                         if (sendVectorUpdate)
00296                                                         {
00297                                                                 // set the update indicator to false
00298                                                                 sendVectorUpdate = false;
00299 
00300                                                                 // prepare a new epidemic bundle
00301                                                                 prepareEpidemicInfo(_epidemic_bundle);
00302 
00303                                                                 ibrcommon::MutexLock l(_list_mutex);
00304                                                                 std::set<dtn::data::EID> list = _neighbors.getAvailable();
00305 
00306                                                                 for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00307                                                                 {
00308                                                                         _taskqueue.push( new TransferSummaryVectorTask( *iter ) );
00309                                                                 }
00310                                                         }
00311                                                         // prepare a new epidemic bundle every 5 seconds
00312                                                         else if ((task.timestamp % 5) == 0)
00313                                                         {
00314                                                                 prepareEpidemicInfo(_epidemic_bundle);
00315                                                         }
00316 
00317                                                         ibrcommon::MutexLock l(_list_mutex);
00318                                                         _purge_vector.expire(task.timestamp);
00319                                                 } catch (std::bad_cast) { };
00320 
00326                                                 try {
00327                                                         SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00328 
00329                                                         // lock the neighbor list
00330                                                         ibrcommon::MutexLock l(_list_mutex);
00331                                                         NeighborDatabase::NeighborEntry &entry = _neighbors.get(task.eid);
00332 
00333                                                         try {
00334                                                                 // acquire resources for transmission
00335                                                                 entry.acquireTransfer();
00336 
00337                                                                 // some debug output
00338                                                                 IBRCOMMON_LOGGER_DEBUG(40) << "search one bundle not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00339 
00340                                                                 if (entry._lastupdate > 0)
00341                                                                 {
00342                                                                         try {
00343                                                                                 // the neighbor supports the epidemic routing scheme
00344                                                                                 // forward all bundles not known by him
00345                                                                                 ibrcommon::BloomFilter &bf = entry._filter;
00346                                                                                 const dtn::data::BundleID b = storage.getByFilter(bf);
00347                                                                                 router.transferTo(task.eid, b);
00348                                                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00349                                                                                 // if the bloomfilter query returns no bundle, then query for the
00350                                                                                 // direct node EID
00351                                                                                 const dtn::data::BundleID b = storage.getByDestination(task.eid, false);
00352                                                                                 router.transferTo(task.eid, b);
00353                                                                         }
00354                                                                 }
00355                                                                 else
00356                                                                 {
00357                                                                         // the neighbor does not seems to support epidemic routing
00358                                                                         // only forward bundles with him as destination
00359                                                                         const dtn::data::BundleID b = storage.getByDestination(task.eid, false);
00360                                                                         router.transferTo(task.eid, b);
00361                                                                 }
00362                                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00363                                                                 entry.releaseTransfer();
00364                                                         } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { };
00365                                                 } catch (std::bad_cast) { };
00366 
00370                                                 try {
00371                                                         TransferSummaryVectorTask &task = dynamic_cast<TransferSummaryVectorTask&>(*t);
00372 
00373                                                         // then transfer the bundle to the destination
00374                                                         router.transferTo(task.eid, _epidemic_bundle);
00375                                                 } catch (std::bad_cast) { };
00376 
00380                                                 try {
00381                                                         ProcessBundleTask &task = dynamic_cast<ProcessBundleTask&>(*t);
00382 
00383                                                         // check for special addresses
00384                                                         if (task.bundle.destination == EPIDEMIC_ROUTING_ADDRESS)
00385                                                         {
00386                                                                 // get the bundle out of the storage
00387                                                                 dtn::data::Bundle bundle = storage.get(task.bundle);
00388 
00389                                                                 // get all epidemic extension blocks of this bundle
00390                                                                 const EpidemicExtensionBlock &ext = bundle.getBlock<EpidemicExtensionBlock>();
00391 
00392                                                                 // get the bloomfilter of this bundle
00393                                                                 const ibrcommon::BloomFilter &filter = ext.getSummaryVector().getBloomFilter();
00394                                                                 const ibrcommon::BloomFilter &purge = ext.getPurgeVector().getBloomFilter();
00395 
00401                                                                 {
00402                                                                         ibrcommon::MutexLock l(_list_mutex);
00403                                                                         _neighbors.updateBundles(bundle._source, filter);
00404                                                                 }
00405 
00406                                                                 // trigger the search-for-next-bundle procedure
00407                                                                 _taskqueue.push( new SearchNextBundleTask( task.origin ) );
00408 
00409                                                                 while (true)
00410                                                                 {
00411                                                                         // delete bundles in the purge vector
00412                                                                         dtn::data::MetaBundle meta = storage.remove(purge);
00413 
00414                                                                         IBRCOMMON_LOGGER_DEBUG(15) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00415 
00416                                                                         // add this bundle to the own purge vector
00417                                                                         _purge_vector.add(meta);
00418                                                                 }
00419 
00420                                                                 // remove the bundle
00421                                                                 storage.remove(bundle);
00422                                                         }
00423                                                         else
00424                                                         {
00425                                                                 ibrcommon::MutexLock l(_list_mutex);
00426 
00427                                                                 // trigger new transmission of the summary vector
00428                                                                 sendVectorUpdate = true;
00429 
00430                                                                 // trigger transmission for all neighbors
00431                                                                 std::set<dtn::data::EID> list = _neighbors.getAvailable();
00432 
00433                                                                 for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00434                                                                 {
00435                                                                         // transfer the next bundle to this destination
00436                                                                         _taskqueue.push( new SearchNextBundleTask( *iter ) );
00437                                                                 }
00438                                                         }
00439                                                 } catch (dtn::data::Bundle::NoSuchBlockFoundException) {
00440                                                         // if the bundle does not contains the expected block
00441                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00442                                                         // if the bundle is not in the storage we have nothing to do
00443                                                 } catch (std::bad_cast) { };
00444                                         } catch (ibrcommon::Exception ex) {
00445                                                 IBRCOMMON_LOGGER(error) << "Exception occurred in EpidemicRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00446                                         }
00447                                 } catch (std::exception) {
00448                                         return;
00449                                 }
00450 
00451                                 yield();
00452                         }
00453                 }
00454 
00455                 dtn::data::Block* EpidemicRoutingExtension::EpidemicExtensionBlock::Factory::create()
00456                 {
00457                         return new EpidemicRoutingExtension::EpidemicExtensionBlock();
00458                 }
00459 
00460                 EpidemicRoutingExtension::EpidemicExtensionBlock::EpidemicExtensionBlock()
00461                  : dtn::data::Block(EpidemicExtensionBlock::BLOCK_TYPE), _data("forwarded through epidemic routing")
00462                 {
00463                 }
00464 
00465                 EpidemicRoutingExtension::EpidemicExtensionBlock::~EpidemicExtensionBlock()
00466                 {
00467                 }
00468 
00469                 void EpidemicRoutingExtension::EpidemicExtensionBlock::setSummaryVector(const SummaryVector &vector)
00470                 {
00471                         _vector = vector;
00472                 }
00473 
00474                 const SummaryVector& EpidemicRoutingExtension::EpidemicExtensionBlock::getSummaryVector() const
00475                 {
00476                         return _vector;
00477                 }
00478 
00479                 void EpidemicRoutingExtension::EpidemicExtensionBlock::setPurgeVector(const SummaryVector &vector)
00480                 {
00481                         _purge = vector;
00482                 }
00483 
00484                 const SummaryVector& EpidemicRoutingExtension::EpidemicExtensionBlock::getPurgeVector() const
00485                 {
00486                         return _purge;
00487                 }
00488 
00489                 void EpidemicRoutingExtension::EpidemicExtensionBlock::set(dtn::data::SDNV value)
00490                 {
00491                         _counter = value;
00492                 }
00493 
00494                 dtn::data::SDNV EpidemicRoutingExtension::EpidemicExtensionBlock::get() const
00495                 {
00496                         return _counter;
00497                 }
00498 
00499                 size_t EpidemicRoutingExtension::EpidemicExtensionBlock::getLength() const
00500                 {
00501                         return _counter.getLength() + _data.getLength() + _vector.getLength();
00502                 }
00503 
00504                 std::istream& EpidemicRoutingExtension::EpidemicExtensionBlock::deserialize(std::istream &stream)
00505                 {
00506                         stream >> _counter;
00507                         stream >> _data;
00508                         stream >> _vector;
00509                         stream >> _purge;
00510 
00511                         // unset block not processed bit
00512                         dtn::data::Block::set(dtn::data::Block::FORWARDED_WITHOUT_PROCESSED, false);
00513 
00514                         return stream;
00515                 }
00516 
00517                 std::ostream &EpidemicRoutingExtension::EpidemicExtensionBlock::serialize(std::ostream &stream) const
00518                 {
00519                         stream << _counter;
00520                         stream << _data;
00521                         stream << _vector;
00522                         stream << _purge;
00523 
00524                         return stream;
00525                 }
00526 
00527                 /****************************************/
00528 
00529                 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00530                  : eid(e)
00531                 { }
00532 
00533                 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00534                 { }
00535 
00536                 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
00537                 {
00538                         return "SearchNextBundleTask: " + eid.getString();
00539                 }
00540 
00541                 /****************************************/
00542 
00543                 EpidemicRoutingExtension::TransferSummaryVectorTask::TransferSummaryVectorTask(const dtn::data::EID &e)
00544                  : eid(e)
00545                 { }
00546 
00547                 EpidemicRoutingExtension::TransferSummaryVectorTask::~TransferSummaryVectorTask()
00548                 { }
00549 
00550                 std::string EpidemicRoutingExtension::TransferSummaryVectorTask::toString()
00551                 {
00552                         return "TransferSummaryVectorTask: " + eid.getString();
00553                 }
00554 
00555                 /****************************************/
00556 
00557                 EpidemicRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o)
00558                  : bundle(meta), origin(o)
00559                 { }
00560 
00561                 EpidemicRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00562                 { }
00563 
00564                 std::string EpidemicRoutingExtension::ProcessBundleTask::toString()
00565                 {
00566                         return "ProcessBundleTask: " + bundle.toString();
00567                 }
00568 
00569                 /****************************************/
00570 
00571                 EpidemicRoutingExtension::ExpireTask::ExpireTask(const size_t t)
00572                  : timestamp(t)
00573                 { }
00574 
00575                 EpidemicRoutingExtension::ExpireTask::~ExpireTask()
00576                 { }
00577 
00578                 std::string EpidemicRoutingExtension::ExpireTask::toString()
00579                 {
00580                         return "ExpireTask";
00581                 }
00582         }
00583 }

Generated on Thu Nov 11 2010 09:49:47 for IBR-DTNSuite by  doxygen 1.7.1