IBR-DTNSuite 0.6

daemon/src/routing/epidemic/EpidemicRoutingExtension.cpp

Go to the documentation of this file.
00001 /*
00002  * EpidemicRoutingExtension.cpp
00003  *
00004  *  Created on: 18.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "routing/epidemic/EpidemicRoutingExtension.h"
00009 #include "routing/epidemic/EpidemicControlMessage.h"
00010 
00011 #include "routing/QueueBundleEvent.h"
00012 #include "net/TransferCompletedEvent.h"
00013 #include "net/TransferAbortedEvent.h"
00014 #include "core/BundleExpiredEvent.h"
00015 #include "core/NodeEvent.h"
00016 #include "core/TimeEvent.h"
00017 #include "core/Node.h"
00018 #include "net/ConnectionManager.h"
00019 #include "net/ConnectionEvent.h"
00020 #include "Configuration.h"
00021 #include "core/BundleCore.h"
00022 #include "core/BundleEvent.h"
00023 
00024 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00025 #include <ibrdtn/data/MetaBundle.h>
00026 #include <ibrcommon/thread/MutexLock.h>
00027 #include <ibrcommon/Logger.h>
00028 #include <ibrcommon/AutoDelete.h>
00029 
00030 #include <functional>
00031 #include <list>
00032 #include <algorithm>
00033 
00034 #include <iomanip>
00035 #include <ios>
00036 #include <iostream>
00037 #include <set>
00038 
00039 #include <stdlib.h>
00040 #include <typeinfo>
00041 
00042 namespace dtn
00043 {
00044         namespace routing
00045         {
00046                 EpidemicRoutingExtension::EpidemicRoutingExtension()
00047                  : _endpoint(_taskqueue, _purge_vector)
00048                 {
00049                         // write something to the syslog
00050                         IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module" << IBRCOMMON_LOGGER_ENDL;
00051                 }
00052 
00053                 EpidemicRoutingExtension::~EpidemicRoutingExtension()
00054                 {
00055                         stop();
00056                         join();
00057                 }
00058 
00059                 void EpidemicRoutingExtension::stopExtension()
00060                 {
00061                         _taskqueue.abort();
00062                 }
00063 
00064                 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt)
00065                 {
00066                         // If an incoming bundle is received, forward it to all connected neighbors
00067                         try {
00068                                 dynamic_cast<const QueueBundleEvent&>(*evt);
00069 
00070                                 // new bundles trigger a recheck for all neighbors
00071                                 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00072 
00073                                 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00074                                 {
00075                                         const dtn::core::Node &n = (*iter);
00076 
00077                                         // transfer the next bundle to this destination
00078                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00079                                 }
00080                                 return;
00081                         } catch (const std::bad_cast&) { };
00082 
00083                         // On each time event look for expired stuff
00084                         try {
00085                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00086 
00087                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00088                                 {
00089                                         // check all lists for expired entries
00090                                         _taskqueue.push( new ExpireTask(time.getTimestamp()) );
00091                                 }
00092                                 return;
00093                         } catch (const std::bad_cast&) { };
00094 
00095                         // If a new neighbor comes available, send him a request for the summary vector
00096                         // If a neighbor went away we can free the stored summary vector
00097                         try {
00098                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00099                                 const dtn::core::Node &n = nodeevent.getNode();
00100 
00101                                 if (nodeevent.getAction() == NODE_AVAILABLE)
00102                                 {
00103                                         // query a new summary vector from this neighbor
00104                                         _taskqueue.push( new QuerySummaryVectorTask( (**this).getNeighborDB(), n.getEID(), _endpoint ) );
00105                                 }
00106 
00107                                 return;
00108                         } catch (const std::bad_cast&) { };
00109 
00110                         try {
00111                                 const dtn::net::ConnectionEvent &ce = dynamic_cast<const dtn::net::ConnectionEvent&>(*evt);
00112 
00113                                 if (ce.state == dtn::net::ConnectionEvent::CONNECTION_UP)
00114                                 {
00115                                         // query a new summary vector from this neighbor
00116                                         _taskqueue.push( new QuerySummaryVectorTask( (**this).getNeighborDB(), ce.peer, _endpoint ) );
00117                                 }
00118                                 return;
00119                         } catch (const std::bad_cast&) { };
00120 
00121                         // The bundle transfer has been aborted
00122                         try {
00123                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00124 
00125                                 // transfer the next bundle to this destination
00126                                 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) );
00127 
00128                                 return;
00129                         } catch (const std::bad_cast&) { };
00130 
00131                         // A bundle transfer was successful
00132                         try {
00133                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00134 
00135                                 // create a transfer completed task
00136                                 _taskqueue.push( new TransferCompletedTask( completed.getPeer(), completed.getBundle() ) );
00137                                 return;
00138                         } catch (const std::bad_cast&) { };
00139                 }
00140 
00141                 bool EpidemicRoutingExtension::__cancellation()
00142                 {
00143                         _taskqueue.abort();
00144                         return true;
00145                 }
00146 
00147                 void EpidemicRoutingExtension::run()
00148                 {
00149                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00150                         {
00151                         public:
00152                                 BundleFilter(const NeighborDatabase::NeighborEntry &entry)
00153                                  : _entry(entry)
00154                                 {};
00155 
00156                                 virtual ~BundleFilter() {};
00157 
00158                                 virtual size_t limit() const { return 10; };
00159 
00160                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00161                                 {
00162                                         // check Scope Control Block - do not forward bundles with hop limit == 0
00163                                         if (meta.hopcount == 0)
00164                                         {
00165                                                 return false;
00166                                         }
00167 
00168                                         // check Scope Control Block - do not forward non-group bundles with hop limit <= 1
00169                                         if ((meta.hopcount <= 1) && (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)))
00170                                         {
00171                                                 return false;
00172                                         }
00173 
00174                                         // do not forward any epidemic control message
00175                                         // this is done by the neighbor routing module
00176                                         if (meta.source == (dtn::core::BundleCore::local + "/routing/epidemic"))
00177                                         {
00178                                                 return false;
00179                                         }
00180 
00181                                         // do not forward to any blacklisted destination
00182                                         const dtn::data::EID dest = meta.destination.getNode();
00183                                         if (_blacklist.find(dest) != _blacklist.end())
00184                                         {
00185                                                 return false;
00186                                         }
00187 
00188                                         // do not forward bundles already known by the destination
00189                                         // throws BloomfilterNotAvailableException if no filter is available or it is expired
00190                                         if (_entry.has(meta, true))
00191                                         {
00192                                                 return false;
00193                                         }
00194 
00195                                         return true;
00196                                 };
00197 
00198                                 void blacklist(const dtn::data::EID& id)
00199                                 {
00200                                         _blacklist.insert(id);
00201                                 };
00202 
00203                         private:
00204                                 std::set<dtn::data::EID> _blacklist;
00205                                 const NeighborDatabase::NeighborEntry &_entry;
00206                         };
00207 
00208                         dtn::core::BundleStorage &storage = (**this).getStorage();
00209 
00210                         while (true)
00211                         {
00212                                 try {
00213                                         Task *t = _taskqueue.getnpop(true);
00214                                         ibrcommon::AutoDelete<Task> killer(t);
00215 
00216                                         IBRCOMMON_LOGGER_DEBUG(50) << "processing epidemic task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00217 
00218                                         try {
00222                                                 try {
00223                                                         ExecutableTask &etask = dynamic_cast<ExecutableTask&>(*t);
00224                                                         etask.execute();
00225                                                 } catch (const std::bad_cast&) { };
00226 
00230                                                 try {
00231                                                         ExpireTask &task = dynamic_cast<ExpireTask&>(*t);
00232                                                         _purge_vector.expire(task.timestamp);
00233                                                 } catch (const std::bad_cast&) { };
00234 
00240                                                 try {
00241                                                         SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00242                                                         NeighborDatabase &db = (**this).getNeighborDB();
00243 
00244                                                         ibrcommon::MutexLock l(db);
00245                                                         NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
00246 
00247                                                         try {
00248                                                                 // get the bundle filter of the neighbor
00249                                                                 BundleFilter filter(entry);
00250 
00251                                                                 // some debug output
00252                                                                 IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00253 
00254                                                                 // blacklist the neighbor itself, because this is handled by neighbor routing extension
00255                                                                 filter.blacklist(task.eid);
00256 
00257                                                                 // query some unknown bundle from the storage, the list contains max. 10 items.
00258                                                                 const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00259 
00260                                                                 // send the bundles as long as we have resources
00261                                                                 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00262                                                                 {
00263                                                                         try {
00264                                                                                 // transfer the bundle to the neighbor
00265                                                                                 transferTo(entry, *iter);
00266                                                                         } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00267                                                                 }
00268                                                         } catch (const NeighborDatabase::BloomfilterNotAvailableException&) {
00269                                                                 // query a new summary vector from this neighbor
00270                                                                 _taskqueue.push( new QuerySummaryVectorTask( (**this).getNeighborDB(), task.eid, _endpoint ) );
00271                                                         }
00272                                                 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00273                                                 } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00274                                                 } catch (const std::bad_cast&) { };
00275 
00279                                                 try {
00280                                                         TransferCompletedTask &task = dynamic_cast<TransferCompletedTask&>(*t);
00281 
00282                                                         try {
00283                                                                 // add this bundle to the purge vector if it is delivered to its destination
00284                                                                 if (( task.peer.getNode() == task.meta.destination.getNode() ) && (task.meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00285                                                                 {
00286                                                                         IBRCOMMON_LOGGER(notice) << "singleton bundle added to purge vector: " << task.meta.toString() << IBRCOMMON_LOGGER_ENDL;
00287 
00288                                                                         // add it to the purge vector
00289                                                                         _purge_vector.add(task.meta);
00290                                                                 }
00291                                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00292 
00293                                                         // transfer the next bundle to this destination
00294                                                         _taskqueue.push( new SearchNextBundleTask( task.peer ) );
00295                                                 } catch (const std::bad_cast&) { };
00296 
00300                                                 try {
00301                                                         const ProcessEpidemicBundleTask &task = dynamic_cast<ProcessEpidemicBundleTask&>(*t);
00302                                                         processECM(task.bundle);
00303                                                 } catch (const std::bad_cast&) { };
00304 
00305                                         } catch (const ibrcommon::Exception &ex) {
00306                                                 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in EpidemicRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00307                                         }
00308                                 } catch (const std::exception&) {
00309                                         return;
00310                                 }
00311 
00312                                 yield();
00313                         }
00314                 }
00315 
00316                 /****************************************/
00317 
00318                 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00319                  : eid(e)
00320                 { }
00321 
00322                 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00323                 { }
00324 
00325                 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
00326                 {
00327                         return "SearchNextBundleTask: " + eid.getString();
00328                 }
00329 
00330                 /****************************************/
00331 
00332                 EpidemicRoutingExtension::ProcessEpidemicBundleTask::ProcessEpidemicBundleTask(const dtn::data::Bundle &b)
00333                  : bundle(b)
00334                 { }
00335 
00336                 EpidemicRoutingExtension::ProcessEpidemicBundleTask::~ProcessEpidemicBundleTask()
00337                 { }
00338 
00339                 std::string EpidemicRoutingExtension::ProcessEpidemicBundleTask::toString()
00340                 {
00341                         return "ProcessEpidemicBundleTask: " + bundle.toString();
00342                 }
00343 
00344                 /****************************************/
00345 
00346                 EpidemicRoutingExtension::ExpireTask::ExpireTask(const size_t t)
00347                  : timestamp(t)
00348                 { }
00349 
00350                 EpidemicRoutingExtension::ExpireTask::~ExpireTask()
00351                 { }
00352 
00353                 std::string EpidemicRoutingExtension::ExpireTask::toString()
00354                 {
00355                         return "ExpireTask";
00356                 }
00357 
00358                 /****************************************/
00359 
00360                 EpidemicRoutingExtension::TransferCompletedTask::TransferCompletedTask(const dtn::data::EID &e, const dtn::data::MetaBundle &m)
00361                  : peer(e), meta(m)
00362                 { }
00363 
00364                 EpidemicRoutingExtension::TransferCompletedTask::~TransferCompletedTask()
00365                 { }
00366 
00367                 std::string EpidemicRoutingExtension::TransferCompletedTask::toString()
00368                 {
00369                         return "TransferCompletedTask";
00370                 }
00371 
00372                 /****************************************/
00373 
00374                 EpidemicRoutingExtension::QuerySummaryVectorTask::QuerySummaryVectorTask(NeighborDatabase &db, const dtn::data::EID &o, EpidemicEndpoint &e)
00375                  : ndb(db), origin(o), endpoint(e)
00376                 { }
00377 
00378                 EpidemicRoutingExtension::QuerySummaryVectorTask::~QuerySummaryVectorTask()
00379                 { }
00380 
00381                 void EpidemicRoutingExtension::QuerySummaryVectorTask::execute() const
00382                 {
00383                         try {
00384                                 // lock the list of neighbors
00385                                 ibrcommon::MutexLock l(ndb);
00386                                 NeighborDatabase::NeighborEntry &entry = ndb.get(origin);
00387 
00388                                 // acquire resources to send a summary vector request
00389                                 entry.acquireFilterRequest();
00390 
00391                                 // call the query method at the epidemic endpoint instance
00392                                 endpoint.query(origin);
00393                         } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00394                         } catch (const NeighborDatabase::NeighborNotAvailableException&) { };
00395                 }
00396 
00397                 std::string EpidemicRoutingExtension::QuerySummaryVectorTask::toString()
00398                 {
00399                         return "QuerySummaryVectorTask: " + origin.getString();
00400                 }
00401 
00402                 EpidemicRoutingExtension::EpidemicEndpoint::EpidemicEndpoint(ibrcommon::Queue<EpidemicRoutingExtension::Task* > &queue, dtn::routing::BundleSummary &purge)
00403                  : _taskqueue(queue), _purge_vector(purge)
00404                 {
00405                         AbstractWorker::initialize("/routing/epidemic", true);
00406                 }
00407 
00408                 EpidemicRoutingExtension::EpidemicEndpoint::~EpidemicEndpoint()
00409                 {
00410                 }
00411 
00412                 void EpidemicRoutingExtension::EpidemicEndpoint::callbackBundleReceived(const Bundle &b)
00413                 {
00414                         _taskqueue.push( new ProcessEpidemicBundleTask(b) );
00415                 }
00416 
00417                 void EpidemicRoutingExtension::EpidemicEndpoint::send(const dtn::data::Bundle &b)
00418                 {
00419                         transmit(b);
00420                 }
00421 
00422                 void EpidemicRoutingExtension::EpidemicEndpoint::query(const dtn::data::EID &origin)
00423                 {
00424                         // create a new request for the summary vector of the neighbor
00425                         EpidemicControlMessage ecm;
00426 
00427                         // set message type
00428                         ecm.type = EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR;
00429 
00430                         // create a new bundle
00431                         dtn::data::Bundle req;
00432 
00433                         // set the source of the bundle
00434                         req._source = dtn::core::BundleCore::local + "/routing/epidemic";
00435 
00436                         // set the destination of the bundle
00437                         req.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00438                         req._destination = origin + "/routing/epidemic";
00439 
00440                         // limit the lifetime to 60 seconds
00441                         req._lifetime = 60;
00442 
00443                         // set high priority
00444                         req.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false);
00445                         req.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00446 
00447                         dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>();
00448                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00449 
00450                         // serialize the request into the payload
00451                         {
00452                                 ibrcommon::BLOB::iostream ios = ref.iostream();
00453                                 (*ios) << ecm;
00454                         }
00455 
00456                         // add a schl block
00457                         dtn::data::ScopeControlHopLimitBlock &schl = req.push_front<dtn::data::ScopeControlHopLimitBlock>();
00458                         schl.setLimit(1);
00459 
00460                         // send the bundle
00461                         transmit(req);
00462                 }
00463 
00464                 void EpidemicRoutingExtension::processECM(const dtn::data::Bundle &bundle)
00465                 {
00466                         // read the ecm
00467                         const dtn::data::PayloadBlock &p = bundle.getBlock<dtn::data::PayloadBlock>();
00468                         ibrcommon::BLOB::Reference ref = p.getBLOB();
00469                         EpidemicControlMessage ecm;
00470 
00471                         // locked within this region
00472                         {
00473                                 ibrcommon::BLOB::iostream s = ref.iostream();
00474                                 (*s) >> ecm;
00475                         }
00476 
00477                         // if this is a request answer with an summary vector
00478                         if (ecm.type == EpidemicControlMessage::ECM_QUERY_SUMMARY_VECTOR)
00479                         {
00480                                 // create a new request for the summary vector of the neighbor
00481                                 EpidemicControlMessage response_ecm;
00482 
00483                                 // set message type
00484                                 response_ecm.type = EpidemicControlMessage::ECM_RESPONSE;
00485 
00486                                 // add own summary vector to the message
00487                                 const SummaryVector vec = (**this).getSummaryVector();
00488                                 response_ecm.setSummaryVector(vec);
00489 
00490                                 // add own purge vector to the message
00491                                 response_ecm.setPurgeVector(_purge_vector);
00492 
00493                                 // create a new bundle
00494                                 dtn::data::Bundle answer;
00495 
00496                                 // set the source of the bundle
00497                                 answer._source = dtn::core::BundleCore::local + "/routing/epidemic";
00498 
00499                                 // set the destination of the bundle
00500                                 answer.set(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON, true);
00501                                 answer._destination = bundle._source;
00502 
00503                                 // limit the lifetime to 60 seconds
00504                                 answer._lifetime = 60;
00505 
00506                                 // set high priority
00507                                 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT1, false);
00508                                 answer.set(dtn::data::PrimaryBlock::PRIORITY_BIT2, true);
00509 
00510                                 dtn::data::PayloadBlock &p = answer.push_back<PayloadBlock>();
00511                                 ibrcommon::BLOB::Reference ref = p.getBLOB();
00512 
00513                                 // serialize the request into the payload
00514                                 {
00515                                         ibrcommon::BLOB::iostream ios = ref.iostream();
00516                                         (*ios) << response_ecm;
00517                                 }
00518 
00519                                 // add a schl block
00520                                 dtn::data::ScopeControlHopLimitBlock &schl = answer.push_front<dtn::data::ScopeControlHopLimitBlock>();
00521                                 schl.setLimit(1);
00522 
00523                                 // transfer the bundle to the neighbor
00524                                 _endpoint.send(answer);
00525                         }
00526                         else if (ecm.type == EpidemicControlMessage::ECM_RESPONSE)
00527                         {
00528                                 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_SUMMARY_VECTOR)
00529                                 {
00530                                         // get the summary vector (bloomfilter) of this ECM
00531                                         const ibrcommon::BloomFilter &filter = ecm.getSummaryVector().getBloomFilter();
00532 
00538                                         try {
00539                                                 NeighborDatabase &db = (**this).getNeighborDB();
00540                                                 ibrcommon::MutexLock l(db);
00541                                                 NeighborDatabase::NeighborEntry &entry = db.get(bundle._source.getNode());
00542                                                 entry.update(filter, bundle._lifetime);
00543 
00544                                                 // trigger the search-for-next-bundle procedure
00545                                                 _taskqueue.push( new SearchNextBundleTask( entry.eid ) );
00546                                         } catch (const NeighborDatabase::NeighborNotAvailableException&) { };
00547                                 }
00548 
00549                                 if (ecm.flags & EpidemicControlMessage::ECM_CONTAINS_PURGE_VECTOR)
00550                                 {
00551                                         // get the purge vector (bloomfilter) of this ECM
00552                                         const ibrcommon::BloomFilter &purge = ecm.getPurgeVector().getBloomFilter();
00553 
00554                                         dtn::core::BundleStorage &storage = (**this).getStorage();
00555 
00556                                         try {
00557                                                 while (true)
00558                                                 {
00559                                                         // delete bundles in the purge vector
00560                                                         const dtn::data::MetaBundle meta = storage.remove(purge);
00561 
00562                                                         // log the purged bundle
00563                                                         IBRCOMMON_LOGGER(notice) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00564 
00565                                                         // gen a report
00566                                                         dtn::core::BundleEvent::raise(meta, BUNDLE_DELETED, StatusReportBlock::DEPLETED_STORAGE);
00567 
00568                                                         // add this bundle to the own purge vector
00569                                                         _purge_vector.add(meta);
00570                                                 }
00571                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00572                                 }
00573                         }
00574                 }
00575         }
00576 }