• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/routing/epidemic/EpidemicRoutingExtension.cpp

Go to the documentation of this file.
00001 /*
00002  * EpidemicRoutingExtension.cpp
00003  *
00004  *  Created on: 18.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "routing/epidemic/EpidemicRoutingExtension.h"
00009 #include "routing/epidemic/EpidemicControlMessage.h"
00010 
00011 #include "routing/QueueBundleEvent.h"
00012 #include "net/TransferCompletedEvent.h"
00013 #include "net/TransferAbortedEvent.h"
00014 #include "core/BundleExpiredEvent.h"
00015 #include "core/NodeEvent.h"
00016 #include "core/TimeEvent.h"
00017 #include "core/Node.h"
00018 #include "net/ConnectionManager.h"
00019 #include "Configuration.h"
00020 #include "core/BundleCore.h"
00021 #include "core/BundleGeneratedEvent.h"
00022 
00023 #include <ibrdtn/data/MetaBundle.h>
00024 #include <ibrcommon/thread/MutexLock.h>
00025 #include <ibrcommon/Logger.h>
00026 #include <ibrcommon/AutoDelete.h>
00027 
00028 #include <functional>
00029 #include <list>
00030 #include <algorithm>
00031 
00032 #include <iomanip>
00033 #include <ios>
00034 #include <iostream>
00035 #include <set>
00036 
00037 #include <stdlib.h>
00038 #include <typeinfo>
00039 
00040 namespace dtn
00041 {
00042         namespace routing
00043         {
00044                 EpidemicRoutingExtension::EpidemicRoutingExtension()
00045                 {
00046                         // write something to the syslog
00047                         IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module" << IBRCOMMON_LOGGER_ENDL;
00048                 }
00049 
00050                 EpidemicRoutingExtension::~EpidemicRoutingExtension()
00051                 {
00052                         stop();
00053                         join();
00054                 }
00055 
00056                 void EpidemicRoutingExtension::stopExtension()
00057                 {
00058                         _taskqueue.abort();
00059                 }
00060 
00061                 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt)
00062                 {
00063                         // If an incoming bundle is received, forward it to all connected neighbors
00064                         try {
00065                                 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00066                                 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) );
00067                                 return;
00068                         } catch (const std::bad_cast&) { };
00069 
00070                         // On each time event look for expired stuff
00071                         try {
00072                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00073 
00074                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00075                                 {
00076                                         // check all lists for expired entries
00077                                         _taskqueue.push( new ExpireTask(time.getTimestamp()) );
00078                                 }
00079                                 return;
00080                         } catch (const std::bad_cast&) { };
00081 
00082                         // If a new neighbor comes available, send him a request for the summary vector
00083                         // If a neighbor went away we can free the stored summary vector
00084                         try {
00085                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00086                                 const dtn::core::Node &n = nodeevent.getNode();
00087 
00088                                 if (nodeevent.getAction() == NODE_AVAILABLE)
00089                                 {
00090                                         NeighborDatabase &db = (**this).getNeighborDB();
00091 
00092                                         try {
00093                                                 // lock the list of neighbors
00094                                                 ibrcommon::MutexLock l(db);
00095                                                 NeighborDatabase::NeighborEntry &entry = db.get(n.getEID());
00096 
00097                                                 // acquire resources to send a summary vector request
00098                                                 entry.acquireFilterRequest();
00099 
00100                                                 // query a new summary vector from this neighbor
00101                                                 _taskqueue.push( new QuerySummaryVectorTask( n.getEID() ) );
00102                                         } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00103                                         } catch (const NeighborDatabase::NeighborNotAvailableException&) { };
00104                                 }
00105 
00106                                 return;
00107                         } catch (const std::bad_cast&) { };
00108 
00109                         // The bundle transfer has been aborted
00110                         try {
00111                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00112 
00113                                 // transfer the next bundle to this destination
00114                                 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) );
00115 
00116                                 return;
00117                         } catch (const std::bad_cast&) { };
00118 
00119                         // A bundle transfer was successful
00120                         try {
00121                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00122 
00123                                 // create a transfer completed task
00124                                 _taskqueue.push( new TransferCompletedTask( completed.getPeer(), completed.getBundle() ) );
00125                                 return;
00126                         } catch (const std::bad_cast&) { };
00127                 }
00128 
00129                 bool EpidemicRoutingExtension::__cancellation()
00130                 {
00131                         _taskqueue.abort();
00132                         return true;
00133                 }
00134 
00135                 void EpidemicRoutingExtension::run()
00136                 {
00137                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00138                         {
00139                         public:
00140                                 BundleFilter(const NeighborDatabase::NeighborEntry &entry)
00141                                  : _entry(entry)
00142                                 {};
00143 
00144                                 virtual ~BundleFilter() {};
00145 
00146                                 virtual size_t limit() const { return 10; };
00147 
00148                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00149                                 {
00150                                         // do not forward any epidemic control message
00151                                         // this is done by the neighbor routing module
00152                                         if (meta.source == (dtn::core::BundleCore::local + "/routing/epidemic"))
00153                                         {
00154                                                 return false;
00155                                         }
00156 
00157                                         // do not forward to any blacklisted destination
00158                                         const dtn::data::EID dest = meta.destination.getNodeEID();
00159                                         if (_blacklist.find(dest) != _blacklist.end())
00160                                         {
00161                                                 return false;
00162                                         }
00163 
00164                                         // do not forward bundles already known by the destination
00165                                         // throws BloomfilterNotAvailableException if no filter is available or it is expired
00166                                         if (_entry.has(meta, true))
00167                                         {
00168                                                 return false;
00169                                         }
00170 
00171                                         return true;
00172                                 };
00173 
00174                                 void blacklist(const dtn::data::EID& id)
00175                                 {
00176                                         _blacklist.insert(id);
00177                                 };
00178 
00179                         private:
00180                                 std::set<dtn::data::EID> _blacklist;
00181                                 const NeighborDatabase::NeighborEntry &_entry;
00182                         };
00183 
00184                         dtn::core::BundleStorage &storage = (**this).getStorage();
00185 
00186                         while (true)
00187                         {
00188                                 try {
00189                                         Task *t = _taskqueue.getnpop(true);
00190                                         ibrcommon::AutoDelete<Task> killer(t);
00191 
00192                                         IBRCOMMON_LOGGER_DEBUG(50) << "processing epidemic task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00193 
00194                                         try {
00198                                                 try {
00199                                                         ExecutableTask &etask = dynamic_cast<ExecutableTask&>(*t);
00200                                                         etask.execute();
00201                                                 } catch (const std::bad_cast&) { };
00202 
00206                                                 try {
00207                                                         ExpireTask &task = dynamic_cast<ExpireTask&>(*t);
00208                                                         _purge_vector.expire(task.timestamp);
00209                                                 } catch (const std::bad_cast&) { };
00210 
00216                                                 try {
00217                                                         SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00218                                                         NeighborDatabase &db = (**this).getNeighborDB();
00219 
00220                                                         ibrcommon::MutexLock l(db);
00221                                                         NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
00222 
00223                                                         try {
00224                                                                 // get the bundle filter of the neighbor
00225                                                                 BundleFilter filter(entry);
00226 
00227                                                                 // some debug output
00228                                                                 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00229 
00230                                                                 // blacklist the neighbor itself, because this is handled by neighbor routing extension
00231                                                                 filter.blacklist(task.eid);
00232 
00233                                                                 // query some unknown bundle from the storage, the list contains max. 10 items.
00234                                                                 const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00235 
00236                                                                 // send the bundles as long as we have resources
00237                                                                 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00238                                                                 {
00239                                                                         try {
00240                                                                                 // transfer the bundle to the neighbor
00241                                                                                 transferTo(entry, *iter);
00242                                                                         } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00243                                                                 }
00244                                                         } catch (const NeighborDatabase::BloomfilterNotAvailableException&) {
00245                                                                 // acquire resources to send a summary vector request
00246                                                                 entry.acquireFilterRequest();
00247 
00248                                                                 // query a new summary vector from this neighbor
00249                                                                 _taskqueue.push( new QuerySummaryVectorTask( task.eid ) );
00250                                                         }
00251                                                 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00252                                                 } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00253                                                 } catch (const std::bad_cast&) { };
00254 
00258                                                 try {
00259                                                         TransferCompletedTask &task = dynamic_cast<TransferCompletedTask&>(*t);
00260 
00261                                                         // add this bundle to the purge vector if it is delivered to its destination
00262                                                         if (( EID(task.peer.getNodeEID()) == EID(task.meta.destination.getNodeEID()) ) && (task.meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00263                                                         {
00264                                                                 IBRCOMMON_LOGGER_DEBUG(15) << "singleton bundle delivered: " << task.meta.toString() << IBRCOMMON_LOGGER_ENDL;
00265 
00266                                                                 // add it to the purge vector
00267                                                                 _purge_vector.add(task.meta);
00268                                                         }
00269 
00270                                                         // transfer the next bundle to this destination
00271                                                         _taskqueue.push( new SearchNextBundleTask( task.peer ) );
00272                                                 } catch (const std::bad_cast&) { };
00273 
00277                                                 try {
00278                                                         ProcessBundleTask &task = dynamic_cast<ProcessBundleTask&>(*t);
00279 
00280                                                         // look for ECM bundles
00281                                                         if ( task.bundle.destination == (dtn::core::BundleCore::local + "/routing/epidemic") )
00282                                                         {
00283                                                                 // the bundle is an control message, get it from the storage
00284                                                                 dtn::data::Bundle bundle = storage.get(task.bundle);
00285 
00286                                                                 // process the incoming ECM
00287                                                                 processECM(task.origin, bundle);
00288 
00289                                                                 // we do not need it anymore in the storage
00290                                                                 storage.remove(bundle);
00291                                                         }
00292                                                         else
00293                                                         {
00294                                                                 // new bundles trigger a recheck for all neighbors
00295                                                                 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00296 
00297                                                                 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00298                                                                 {
00299                                                                         const dtn::core::Node &n = (*iter);
00300 
00301                                                                         // transfer the next bundle to this destination
00302                                                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00303                                                                 }
00304                                                         }
00305                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00306                                                         // if the bundle is not in the storage we have nothing to do
00307                                                 } catch (const std::bad_cast&) { };
00308                                         } catch (const ibrcommon::Exception &ex) {
00309                                                 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in EpidemicRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00310                                         }
00311                                 } catch (const std::exception&) {
00312                                         return;
00313                                 }
00314 
00315                                 yield();
00316                         }
00317                 }
00318 
00319                 /****************************************/
00320 
00321                 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00322                  : eid(e)
00323                 { }
00324 
00325                 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00326                 { }
00327 
00328                 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
00329                 {
00330                         return "SearchNextBundleTask: " + eid.getString();
00331                 }
00332 
00333                 /****************************************/
00334 
00335                 EpidemicRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o)
00336                  : bundle(meta), origin(o)
00337                 { }
00338 
00339                 EpidemicRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00340                 { }
00341 
00342                 std::string EpidemicRoutingExtension::ProcessBundleTask::toString()
00343                 {
00344                         return "ProcessBundleTask: " + bundle.toString();
00345                 }
00346 
00347                 /****************************************/
00348 
00349                 EpidemicRoutingExtension::ExpireTask::ExpireTask(const size_t t)
00350                  : timestamp(t)
00351                 { }
00352 
00353                 EpidemicRoutingExtension::ExpireTask::~ExpireTask()
00354                 { }
00355 
00356                 std::string EpidemicRoutingExtension::ExpireTask::toString()
00357                 {
00358                         return "ExpireTask";
00359                 }
00360 
00361                 /****************************************/
00362 
00363                 EpidemicRoutingExtension::TransferCompletedTask::TransferCompletedTask(const dtn::data::EID &e, const dtn::data::MetaBundle &m)
00364                  : peer(e), meta(m)
00365                 { }
00366 
00367                 EpidemicRoutingExtension::TransferCompletedTask::~TransferCompletedTask()
00368                 { }
00369 
00370                 std::string EpidemicRoutingExtension::TransferCompletedTask::toString()
00371                 {
00372                         return "TransferCompletedTask";
00373                 }
00374 
00375                 /****************************************/
00376 
00377                 EpidemicRoutingExtension::QuerySummaryVectorTask::QuerySummaryVectorTask(const dtn::data::EID &o)
00378                  : origin(o)
00379                 { }
00380 
00381                 EpidemicRoutingExtension::QuerySummaryVectorTask::~QuerySummaryVectorTask()
00382                 { }
00383 
00384                 void EpidemicRoutingExtension::QuerySummaryVectorTask::execute() const
00385                 {
00386                         // create a new request for the summary vector of the neighbor
00387                         EpidemicControlMessage ecm;
00388 
00389                         // set message type
00390                         ecm.type = EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR;
00391 
00392                         // create a new bundle
00393                         dtn::data::Bundle req;
00394 
00395                         // set the source of the bundle
00396                         req._source = dtn::core::BundleCore::local + "/routing/epidemic";
00397 
00398                         // set the destination of the bundle
00399                         req.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00400                         req._destination = origin + "/routing/epidemic";
00401 
00402                         // limit the lifetime to 60 seconds
00403                         req._lifetime = 60;
00404 
00405                         // set high priority
00406                         req.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, true);
00407                         req.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00408 
00409                         dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>();
00410                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00411 
00412                         // serialize the request into the payload
00413                         {
00414                                 ibrcommon::BLOB::iostream ios = ref.iostream();
00415                                 (*ios) << ecm;
00416                         }
00417 
00418                         // transfer the bundle to the neighbor
00419                         dtn::core::BundleGeneratedEvent::raise(req);
00420                 }
00421 
00422                 std::string EpidemicRoutingExtension::QuerySummaryVectorTask::toString()
00423                 {
00424                         return "QuerySummaryVectorTask: " + origin.getString();
00425                 }
00426 
00427                 void EpidemicRoutingExtension::processECM(const dtn::data::EID &origin, const dtn::data::Bundle &bundle)
00428                 {
00429                         // read the ecm
00430                         const dtn::data::PayloadBlock &p = bundle.getBlock<dtn::data::PayloadBlock>();
00431                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00432                         EpidemicControlMessage ecm;
00433 
00434                         // locked within this region
00435                         {
00436                                 ibrcommon::BLOB::iostream s = ref.iostream();
00437                                 (*s) >> ecm;
00438                         }
00439 
00440                         // if this is a request answer with an summary vector
00441                         if (ecm.type == EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR)
00442                         {
00443                                 // create a new request for the summary vector of the neighbor
00444                                 EpidemicControlMessage response_ecm;
00445 
00446                                 // set message type
00447                                 response_ecm.type = EpidemicControlMessage::ECM_RESPONSE;
00448 
00449                                 // add own summary vector to the message
00450                                 const SummaryVector vec = (**this).getSummaryVector();
00451                                 response_ecm.setSummaryVector(vec);
00452 
00453                                 // add own purge vector to the message
00454                                 response_ecm.setPurgeVector(_purge_vector);
00455 
00456                                 // create a new bundle
00457                                 dtn::data::Bundle answer;
00458 
00459                                 // set the source of the bundle
00460                                 answer._source = dtn::core::BundleCore::local + "/routing/epidemic";
00461 
00462                                 // set the destination of the bundle
00463                                 answer.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00464                                 answer._destination = bundle._source;
00465 
00466                                 // limit the lifetime to 60 seconds
00467                                 answer._lifetime = 60;
00468 
00469                                 // set high priority
00470                                 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, true);
00471                                 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00472 
00473                                 dtn::data::PayloadBlock &p = answer.push_back<PayloadBlock>();
00474                                 ibrcommon::BLOB::Reference ref = p.getBLOB();
00475 
00476                                 // serialize the request into the payload
00477                                 {
00478                                         ibrcommon::BLOB::iostream ios = ref.iostream();
00479                                         (*ios) << response_ecm;
00480                                 }
00481 
00482                                 // transfer the bundle to the neighbor
00483                                 dtn::core::BundleGeneratedEvent::raise(answer);
00484                         }
00485                         else if (ecm.type == EpidemicControlMessage::ECM_RESPONSE)
00486                         {
00487                                 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_SUMMARY_VECTOR)
00488                                 {
00489                                         // get the summary vector (bloomfilter) of this ECM
00490                                         const ibrcommon::BloomFilter &filter = ecm.getSummaryVector().getBloomFilter();
00491 
00497                                         try {
00498                                                 NeighborDatabase &db = (**this).getNeighborDB();
00499                                                 ibrcommon::MutexLock l(db);
00500                                                 NeighborDatabase::NeighborEntry &entry = db.get(bundle._source.getNodeEID());
00501                                                 entry.update(filter, bundle._lifetime);
00502 
00503                                                 // trigger the search-for-next-bundle procedure
00504                                                 _taskqueue.push( new SearchNextBundleTask( origin ) );
00505                                         } catch (const NeighborDatabase::NeighborNotAvailableException&) { };
00506                                 }
00507 
00508                                 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_PURGE_VECTOR)
00509                                 {
00510                                         // get the purge vector (bloomfilter) of this ECM
00511                                         const ibrcommon::BloomFilter &purge = ecm.getPurgeVector().getBloomFilter();
00512 
00513                                         dtn::core::BundleStorage &storage = (**this).getStorage();
00514 
00515                                         try {
00516                                                 while (true)
00517                                                 {
00518                                                         // delete bundles in the purge vector
00519                                                         const dtn::data::MetaBundle meta = storage.remove(purge);
00520 
00521                                                         IBRCOMMON_LOGGER_DEBUG(15) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00522 
00523                                                         // add this bundle to the own purge vector
00524                                                         _purge_vector.add(meta);
00525                                                 }
00526                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00527                                 }
00528                         }
00529                 }
00530         }
00531 }

Generated on Wed Mar 30 2011 11:11:49 for IBR-DTNSuite by  doxygen 1.7.1