IBR-DTNSuite 0.6

daemon/src/routing/epidemic/EpidemicRoutingExtension.cpp

Go to the documentation of this file.
00001 /*
00002  * EpidemicRoutingExtension.cpp
00003  *
00004  *  Created on: 18.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "routing/epidemic/EpidemicRoutingExtension.h"
00009 #include "routing/epidemic/EpidemicControlMessage.h"
00010 
00011 #include "routing/QueueBundleEvent.h"
00012 #include "net/TransferCompletedEvent.h"
00013 #include "net/TransferAbortedEvent.h"
00014 #include "core/BundleExpiredEvent.h"
00015 #include "core/NodeEvent.h"
00016 #include "core/TimeEvent.h"
00017 #include "core/Node.h"
00018 #include "net/ConnectionManager.h"
00019 #include "Configuration.h"
00020 #include "core/BundleCore.h"
00021 #include "core/BundleEvent.h"
00022 
00023 #include <ibrdtn/data/MetaBundle.h>
00024 #include <ibrcommon/thread/MutexLock.h>
00025 #include <ibrcommon/Logger.h>
00026 #include <ibrcommon/AutoDelete.h>
00027 
00028 #include <functional>
00029 #include <list>
00030 #include <algorithm>
00031 
00032 #include <iomanip>
00033 #include <ios>
00034 #include <iostream>
00035 #include <set>
00036 
00037 #include <stdlib.h>
00038 #include <typeinfo>
00039 
00040 namespace dtn
00041 {
00042         namespace routing
00043         {
00044                 EpidemicRoutingExtension::EpidemicRoutingExtension()
00045                  : _endpoint(_taskqueue, _purge_vector)
00046                 {
00047                         // write something to the syslog
00048                         IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module" << IBRCOMMON_LOGGER_ENDL;
00049                 }
00050 
00051                 EpidemicRoutingExtension::~EpidemicRoutingExtension()
00052                 {
00053                         stop();
00054                         join();
00055                 }
00056 
00057                 void EpidemicRoutingExtension::stopExtension()
00058                 {
00059                         _taskqueue.abort();
00060                 }
00061 
00062                 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt)
00063                 {
00064                         // If an incoming bundle is received, forward it to all connected neighbors
00065                         try {
00066                                 dynamic_cast<const QueueBundleEvent&>(*evt);
00067 
00068                                 // new bundles trigger a recheck for all neighbors
00069                                 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00070 
00071                                 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00072                                 {
00073                                         const dtn::core::Node &n = (*iter);
00074 
00075                                         // transfer the next bundle to this destination
00076                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00077                                 }
00078                                 return;
00079                         } catch (const std::bad_cast&) { };
00080 
00081                         // On each time event look for expired stuff
00082                         try {
00083                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00084 
00085                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00086                                 {
00087                                         // check all lists for expired entries
00088                                         _taskqueue.push( new ExpireTask(time.getTimestamp()) );
00089                                 }
00090                                 return;
00091                         } catch (const std::bad_cast&) { };
00092 
00093                         // If a new neighbor comes available, send him a request for the summary vector
00094                         // If a neighbor went away we can free the stored summary vector
00095                         try {
00096                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00097                                 const dtn::core::Node &n = nodeevent.getNode();
00098 
00099                                 if (nodeevent.getAction() == NODE_AVAILABLE)
00100                                 {
00101                                         // query a new summary vector from this neighbor
00102                                         _taskqueue.push( new QuerySummaryVectorTask( (**this).getNeighborDB(), n.getEID(), _endpoint ) );
00103                                 }
00104 
00105                                 return;
00106                         } catch (const std::bad_cast&) { };
00107 
00108                         // The bundle transfer has been aborted
00109                         try {
00110                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00111 
00112                                 // transfer the next bundle to this destination
00113                                 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) );
00114 
00115                                 return;
00116                         } catch (const std::bad_cast&) { };
00117 
00118                         // A bundle transfer was successful
00119                         try {
00120                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00121 
00122                                 // create a transfer completed task
00123                                 _taskqueue.push( new TransferCompletedTask( completed.getPeer(), completed.getBundle() ) );
00124                                 return;
00125                         } catch (const std::bad_cast&) { };
00126                 }
00127 
00128                 bool EpidemicRoutingExtension::__cancellation()
00129                 {
00130                         _taskqueue.abort();
00131                         return true;
00132                 }
00133 
00134                 void EpidemicRoutingExtension::run()
00135                 {
00136                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00137                         {
00138                         public:
00139                                 BundleFilter(const NeighborDatabase::NeighborEntry &entry)
00140                                  : _entry(entry)
00141                                 {};
00142 
00143                                 virtual ~BundleFilter() {};
00144 
00145                                 virtual size_t limit() const { return 10; };
00146 
00147                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00148                                 {
00149                                         // do not forward any epidemic control message
00150                                         // this is done by the neighbor routing module
00151                                         if (meta.source == (dtn::core::BundleCore::local + "/routing/epidemic"))
00152                                         {
00153                                                 return false;
00154                                         }
00155 
00156                                         // do not forward to any blacklisted destination
00157                                         const dtn::data::EID dest = meta.destination.getNode();
00158                                         if (_blacklist.find(dest) != _blacklist.end())
00159                                         {
00160                                                 return false;
00161                                         }
00162 
00163                                         // do not forward bundles already known by the destination
00164                                         // throws BloomfilterNotAvailableException if no filter is available or it is expired
00165                                         if (_entry.has(meta, true))
00166                                         {
00167                                                 return false;
00168                                         }
00169 
00170                                         return true;
00171                                 };
00172 
00173                                 void blacklist(const dtn::data::EID& id)
00174                                 {
00175                                         _blacklist.insert(id);
00176                                 };
00177 
00178                         private:
00179                                 std::set<dtn::data::EID> _blacklist;
00180                                 const NeighborDatabase::NeighborEntry &_entry;
00181                         };
00182 
00183                         dtn::core::BundleStorage &storage = (**this).getStorage();
00184 
00185                         while (true)
00186                         {
00187                                 try {
00188                                         Task *t = _taskqueue.getnpop(true);
00189                                         ibrcommon::AutoDelete<Task> killer(t);
00190 
00191                                         IBRCOMMON_LOGGER_DEBUG(50) << "processing epidemic task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00192 
00193                                         try {
00197                                                 try {
00198                                                         ExecutableTask &etask = dynamic_cast<ExecutableTask&>(*t);
00199                                                         etask.execute();
00200                                                 } catch (const std::bad_cast&) { };
00201 
00205                                                 try {
00206                                                         ExpireTask &task = dynamic_cast<ExpireTask&>(*t);
00207                                                         _purge_vector.expire(task.timestamp);
00208                                                 } catch (const std::bad_cast&) { };
00209 
00215                                                 try {
00216                                                         SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00217                                                         NeighborDatabase &db = (**this).getNeighborDB();
00218 
00219                                                         ibrcommon::MutexLock l(db);
00220                                                         NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
00221 
00222                                                         try {
00223                                                                 // get the bundle filter of the neighbor
00224                                                                 BundleFilter filter(entry);
00225 
00226                                                                 // some debug output
00227                                                                 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00228 
00229                                                                 // blacklist the neighbor itself, because this is handled by neighbor routing extension
00230                                                                 filter.blacklist(task.eid);
00231 
00232                                                                 // query some unknown bundle from the storage, the list contains max. 10 items.
00233                                                                 const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00234 
00235                                                                 // send the bundles as long as we have resources
00236                                                                 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00237                                                                 {
00238                                                                         try {
00239                                                                                 // transfer the bundle to the neighbor
00240                                                                                 transferTo(entry, *iter);
00241                                                                         } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00242                                                                 }
00243                                                         } catch (const NeighborDatabase::BloomfilterNotAvailableException&) {
00244                                                                 // query a new summary vector from this neighbor
00245                                                                 _taskqueue.push( new QuerySummaryVectorTask( (**this).getNeighborDB(), task.eid, _endpoint ) );
00246                                                         }
00247                                                 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00248                                                 } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00249                                                 } catch (const std::bad_cast&) { };
00250 
00254                                                 try {
00255                                                         TransferCompletedTask &task = dynamic_cast<TransferCompletedTask&>(*t);
00256 
00257                                                         try {
00258                                                                 // add this bundle to the purge vector if it is delivered to its destination
00259                                                                 if (( task.peer.getNode() == task.meta.destination.getNode() ) && (task.meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00260                                                                 {
00261                                                                         IBRCOMMON_LOGGER(notice) << "singleton bundle added to purge vector: " << task.meta.toString() << IBRCOMMON_LOGGER_ENDL;
00262 
00263                                                                         // add it to the purge vector
00264                                                                         _purge_vector.add(task.meta);
00265                                                                 }
00266                                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00267 
00268                                                         // transfer the next bundle to this destination
00269                                                         _taskqueue.push( new SearchNextBundleTask( task.peer ) );
00270                                                 } catch (const std::bad_cast&) { };
00271 
00275                                                 try {
00276                                                         const ProcessEpidemicBundleTask &task = dynamic_cast<ProcessEpidemicBundleTask&>(*t);
00277                                                         processECM(task.bundle);
00278                                                 } catch (const std::bad_cast&) { };
00279 
00280                                         } catch (const ibrcommon::Exception &ex) {
00281                                                 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in EpidemicRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00282                                         }
00283                                 } catch (const std::exception&) {
00284                                         return;
00285                                 }
00286 
00287                                 yield();
00288                         }
00289                 }
00290 
00291                 /****************************************/
00292 
00293                 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00294                  : eid(e)
00295                 { }
00296 
00297                 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00298                 { }
00299 
00300                 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
00301                 {
00302                         return "SearchNextBundleTask: " + eid.getString();
00303                 }
00304 
00305                 /****************************************/
00306 
00307                 EpidemicRoutingExtension::ProcessEpidemicBundleTask::ProcessEpidemicBundleTask(const dtn::data::Bundle &b)
00308                  : bundle(b)
00309                 { }
00310 
00311                 EpidemicRoutingExtension::ProcessEpidemicBundleTask::~ProcessEpidemicBundleTask()
00312                 { }
00313 
00314                 std::string EpidemicRoutingExtension::ProcessEpidemicBundleTask::toString()
00315                 {
00316                         return "ProcessEpidemicBundleTask: " + bundle.toString();
00317                 }
00318 
00319                 /****************************************/
00320 
00321                 EpidemicRoutingExtension::ExpireTask::ExpireTask(const size_t t)
00322                  : timestamp(t)
00323                 { }
00324 
00325                 EpidemicRoutingExtension::ExpireTask::~ExpireTask()
00326                 { }
00327 
00328                 std::string EpidemicRoutingExtension::ExpireTask::toString()
00329                 {
00330                         return "ExpireTask";
00331                 }
00332 
00333                 /****************************************/
00334 
00335                 EpidemicRoutingExtension::TransferCompletedTask::TransferCompletedTask(const dtn::data::EID &e, const dtn::data::MetaBundle &m)
00336                  : peer(e), meta(m)
00337                 { }
00338 
00339                 EpidemicRoutingExtension::TransferCompletedTask::~TransferCompletedTask()
00340                 { }
00341 
00342                 std::string EpidemicRoutingExtension::TransferCompletedTask::toString()
00343                 {
00344                         return "TransferCompletedTask";
00345                 }
00346 
00347                 /****************************************/
00348 
00349                 EpidemicRoutingExtension::QuerySummaryVectorTask::QuerySummaryVectorTask(NeighborDatabase &db, const dtn::data::EID &o, EpidemicEndpoint &e)
00350                  : ndb(db), origin(o), endpoint(e)
00351                 { }
00352 
00353                 EpidemicRoutingExtension::QuerySummaryVectorTask::~QuerySummaryVectorTask()
00354                 { }
00355 
00356                 void EpidemicRoutingExtension::QuerySummaryVectorTask::execute() const
00357                 {
00358                         try {
00359                                 // lock the list of neighbors
00360                                 ibrcommon::MutexLock l(ndb);
00361                                 NeighborDatabase::NeighborEntry &entry = ndb.get(origin);
00362 
00363                                 // acquire resources to send a summary vector request
00364                                 entry.acquireFilterRequest();
00365 
00366                                 // call the query method at the epidemic endpoint instance
00367                                 endpoint.query(origin);
00368                         } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00369                         } catch (const NeighborDatabase::NeighborNotAvailableException&) { };
00370                 }
00371 
00372                 std::string EpidemicRoutingExtension::QuerySummaryVectorTask::toString()
00373                 {
00374                         return "QuerySummaryVectorTask: " + origin.getString();
00375                 }
00376 
00377                 EpidemicRoutingExtension::EpidemicEndpoint::EpidemicEndpoint(ibrcommon::Queue<EpidemicRoutingExtension::Task* > &queue, dtn::routing::BundleSummary &purge)
00378                  : _taskqueue(queue), _purge_vector(purge)
00379                 {
00380                         AbstractWorker::initialize("/routing/epidemic", true);
00381                 }
00382 
00383                 EpidemicRoutingExtension::EpidemicEndpoint::~EpidemicEndpoint()
00384                 {
00385                 }
00386 
00387                 void EpidemicRoutingExtension::EpidemicEndpoint::callbackBundleReceived(const Bundle &b)
00388                 {
00389                         _taskqueue.push( new ProcessEpidemicBundleTask(b) );
00390                 }
00391 
00392                 void EpidemicRoutingExtension::EpidemicEndpoint::send(const dtn::data::Bundle &b)
00393                 {
00394                         transmit(b);
00395                 }
00396 
00397                 void EpidemicRoutingExtension::EpidemicEndpoint::query(const dtn::data::EID &origin)
00398                 {
00399                         // create a new request for the summary vector of the neighbor
00400                         EpidemicControlMessage ecm;
00401 
00402                         // set message type
00403                         ecm.type = EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR;
00404 
00405                         // create a new bundle
00406                         dtn::data::Bundle req;
00407 
00408                         // set the source of the bundle
00409                         req._source = dtn::core::BundleCore::local + "/routing/epidemic";
00410 
00411                         // set the destination of the bundle
00412                         req.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00413                         req._destination = origin + "/routing/epidemic";
00414 
00415                         // limit the lifetime to 60 seconds
00416                         req._lifetime = 60;
00417 
00418                         // set high priority
00419                         req.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, true);
00420                         req.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00421 
00422                         dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>();
00423                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00424 
00425                         // serialize the request into the payload
00426                         {
00427                                 ibrcommon::BLOB::iostream ios = ref.iostream();
00428                                 (*ios) << ecm;
00429                         }
00430 
00431                         // send the bundle
00432                         transmit(req);
00433                 }
00434 
00435                 void EpidemicRoutingExtension::processECM(const dtn::data::Bundle &bundle)
00436                 {
00437                         // read the ecm
00438                         const dtn::data::PayloadBlock &p = bundle.getBlock<dtn::data::PayloadBlock>();
00439                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00440                         EpidemicControlMessage ecm;
00441 
00442                         // locked within this region
00443                         {
00444                                 ibrcommon::BLOB::iostream s = ref.iostream();
00445                                 (*s) >> ecm;
00446                         }
00447 
00448                         // if this is a request answer with an summary vector
00449                         if (ecm.type == EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR)
00450                         {
00451                                 // create a new request for the summary vector of the neighbor
00452                                 EpidemicControlMessage response_ecm;
00453 
00454                                 // set message type
00455                                 response_ecm.type = EpidemicControlMessage::ECM_RESPONSE;
00456 
00457                                 // add own summary vector to the message
00458                                 const SummaryVector vec = (**this).getSummaryVector();
00459                                 response_ecm.setSummaryVector(vec);
00460 
00461                                 // add own purge vector to the message
00462                                 response_ecm.setPurgeVector(_purge_vector);
00463 
00464                                 // create a new bundle
00465                                 dtn::data::Bundle answer;
00466 
00467                                 // set the source of the bundle
00468                                 answer._source = dtn::core::BundleCore::local + "/routing/epidemic";
00469 
00470                                 // set the destination of the bundle
00471                                 answer.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00472                                 answer._destination = bundle._source;
00473 
00474                                 // limit the lifetime to 60 seconds
00475                                 answer._lifetime = 60;
00476 
00477                                 // set high priority
00478                                 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, true);
00479                                 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00480 
00481                                 dtn::data::PayloadBlock &p = answer.push_back<PayloadBlock>();
00482                                 ibrcommon::BLOB::Reference ref = p.getBLOB();
00483 
00484                                 // serialize the request into the payload
00485                                 {
00486                                         ibrcommon::BLOB::iostream ios = ref.iostream();
00487                                         (*ios) << response_ecm;
00488                                 }
00489 
00490                                 // transfer the bundle to the neighbor
00491                                 _endpoint.send(answer);
00492                         }
00493                         else if (ecm.type == EpidemicControlMessage::ECM_RESPONSE)
00494                         {
00495                                 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_SUMMARY_VECTOR)
00496                                 {
00497                                         // get the summary vector (bloomfilter) of this ECM
00498                                         const ibrcommon::BloomFilter &filter = ecm.getSummaryVector().getBloomFilter();
00499 
00505                                         try {
00506                                                 NeighborDatabase &db = (**this).getNeighborDB();
00507                                                 ibrcommon::MutexLock l(db);
00508                                                 NeighborDatabase::NeighborEntry &entry = db.get(bundle._source.getNode());
00509                                                 entry.update(filter, bundle._lifetime);
00510 
00511                                                 // trigger the search-for-next-bundle procedure
00512                                                 _taskqueue.push( new SearchNextBundleTask( entry.eid ) );
00513                                         } catch (const NeighborDatabase::NeighborNotAvailableException&) { };
00514                                 }
00515 
00516                                 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_PURGE_VECTOR)
00517                                 {
00518                                         // get the purge vector (bloomfilter) of this ECM
00519                                         const ibrcommon::BloomFilter &purge = ecm.getPurgeVector().getBloomFilter();
00520 
00521                                         dtn::core::BundleStorage &storage = (**this).getStorage();
00522 
00523                                         try {
00524                                                 while (true)
00525                                                 {
00526                                                         // delete bundles in the purge vector
00527                                                         const dtn::data::MetaBundle meta = storage.remove(purge);
00528 
00529                                                         // log the purged bundle
00530                                                         IBRCOMMON_LOGGER(notice) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00531 
00532                                                         // gen a report
00533                                                         dtn::core::BundleEvent::raise(meta, BUNDLE_DELETED, StatusReportBlock::DEPLETED_STORAGE);
00534 
00535                                                         // add this bundle to the own purge vector
00536                                                         _purge_vector.add(meta);
00537                                                 }
00538                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00539                                 }
00540                         }
00541                 }
00542         }
00543 }