IBR-DTNSuite 0.6

daemon/src/routing/flooding/FloodRoutingExtension.cpp

Go to the documentation of this file.
00001 /*
00002  * FloodRoutingExtension.cpp
00003  *
00004  *  Created on: 18.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "routing/flooding/FloodRoutingExtension.h"
00009 #include "routing/QueueBundleEvent.h"
00010 #include "core/NodeEvent.h"
00011 #include "net/TransferCompletedEvent.h"
00012 #include "net/TransferAbortedEvent.h"
00013 #include "net/ConnectionEvent.h"
00014 #include "core/Node.h"
00015 #include "net/ConnectionManager.h"
00016 #include "Configuration.h"
00017 #include "core/BundleCore.h"
00018 
00019 #include <ibrdtn/data/MetaBundle.h>
00020 #include <ibrcommon/thread/MutexLock.h>
00021 #include <ibrcommon/Logger.h>
00022 #include <ibrcommon/AutoDelete.h>
00023 
00024 #include <functional>
00025 #include <list>
00026 #include <algorithm>
00027 #include <iomanip>
00028 #include <ios>
00029 #include <iostream>
00030 
00031 #include <stdlib.h>
00032 #include <typeinfo>
00033 
00034 namespace dtn
00035 {
00036         namespace routing
00037         {
00038                 FloodRoutingExtension::FloodRoutingExtension()
00039                 {
00040                         // write something to the syslog
00041                         IBRCOMMON_LOGGER(info) << "Initializing flooding routing module" << IBRCOMMON_LOGGER_ENDL;
00042                 }
00043 
00044                 FloodRoutingExtension::~FloodRoutingExtension()
00045                 {
00046                         stop();
00047                         join();
00048                 }
00049 
00050                 void FloodRoutingExtension::stopExtension()
00051                 {
00052                         _taskqueue.abort();
00053                 }
00054 
00055                 void FloodRoutingExtension::notify(const dtn::core::Event *evt)
00056                 {
00057                         try {
00058                                 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00059                                 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) );
00060                                 return;
00061                         } catch (const std::bad_cast&) { };
00062 
00063                         try {
00064                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00065                                 const dtn::core::Node &n = nodeevent.getNode();
00066 
00067                                 if (nodeevent.getAction() == NODE_AVAILABLE)
00068                                 {
00069                                         const dtn::data::EID &eid = n.getEID();
00070 
00071                                         // send all (multi-hop) bundles in the storage to the neighbor
00072                                         _taskqueue.push( new SearchNextBundleTask(eid) );
00073                                 }
00074                                 return;
00075                         } catch (const std::bad_cast&) { };
00076 
00077                         try {
00078                                 const dtn::net::ConnectionEvent &ce = dynamic_cast<const dtn::net::ConnectionEvent&>(*evt);
00079 
00080                                 if (ce.state == dtn::net::ConnectionEvent::CONNECTION_UP)
00081                                 {
00082                                         // send all (multi-hop) bundles in the storage to the neighbor
00083                                         _taskqueue.push( new SearchNextBundleTask(ce.peer) );
00084                                 }
00085                                 return;
00086                         } catch (const std::bad_cast&) { };
00087 
00088                         // The bundle transfer has been aborted
00089                         try {
00090                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00091 
00092                                 // transfer the next bundle to this destination
00093                                 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) );
00094                                 return;
00095                         } catch (const std::bad_cast&) { };
00096 
00097                         // A bundle transfer was successful
00098                         try {
00099                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00100 
00101                                 // transfer the next bundle to this destination
00102                                 _taskqueue.push( new SearchNextBundleTask( completed.getPeer() ) );
00103                                 return;
00104                         } catch (const std::bad_cast&) { };
00105                 }
00106 
00107                 bool FloodRoutingExtension::__cancellation()
00108                 {
00109                         _taskqueue.abort();
00110                         return true;
00111                 }
00112 
00113                 void FloodRoutingExtension::run()
00114                 {
00115                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00116                         {
00117                         public:
00118                                 BundleFilter(const NeighborDatabase::NeighborEntry &entry)
00119                                  : _entry(entry)
00120                                 {};
00121 
00122                                 virtual ~BundleFilter() {};
00123 
00124                                 virtual size_t limit() const { return 10; };
00125 
00126                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00127                                 {
00128                                         // check Scope Control Block - do not forward bundles with hop limit == 0
00129                                         if (meta.hopcount == 0)
00130                                         {
00131                                                 return false;
00132                                         }
00133 
00134                                         // check Scope Control Block - do not forward non-group bundles with hop limit <= 1
00135                                         if ((meta.hopcount <= 1) && (meta.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)))
00136                                         {
00137                                                 return false;
00138                                         }
00139 
00140                                         // do not forward to any blacklisted destination
00141                                         const dtn::data::EID dest = meta.destination.getNode();
00142                                         if (_blacklist.find(dest) != _blacklist.end())
00143                                         {
00144                                                 return false;
00145                                         }
00146 
00147                                         // do not forward bundles already known by the destination
00148                                         if (_entry.has(meta))
00149                                         {
00150                                                 return false;
00151                                         }
00152 
00153                                         return true;
00154                                 };
00155 
00156                                 void blacklist(const dtn::data::EID& id)
00157                                 {
00158                                         _blacklist.insert(id);
00159                                 };
00160 
00161                         private:
00162                                 std::set<dtn::data::EID> _blacklist;
00163                                 const NeighborDatabase::NeighborEntry &_entry;
00164                         };
00165 
00166                         dtn::core::BundleStorage &storage = (**this).getStorage();
00167 
00168                         while (true)
00169                         {
00170                                 try {
00171                                         Task *t = _taskqueue.getnpop(true);
00172                                         ibrcommon::AutoDelete<Task> killer(t);
00173 
00174                                         IBRCOMMON_LOGGER_DEBUG(50) << "processing flooding task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00175 
00176                                         try {
00177                                                 try {
00178                                                         SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00179                                                         NeighborDatabase &db = (**this).getNeighborDB();
00180 
00181                                                         ibrcommon::MutexLock l(db);
00182                                                         NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
00183 
00184                                                         // get the bundle filter of the neighbor
00185                                                         BundleFilter filter(entry);
00186 
00187                                                         // some debug
00188                                                         IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00189 
00190                                                         // blacklist the neighbor itself, because this is handles by neighbor routing extension
00191                                                         filter.blacklist(task.eid);
00192 
00193                                                         // query all bundles from the storage
00194                                                         const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00195 
00196                                                         // send the bundles as long as we have resources
00197                                                         for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00198                                                         {
00199                                                                 try {
00200                                                                         // transfer the bundle to the neighbor
00201                                                                         transferTo(entry, *iter);
00202                                                                 } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00203                                                         }
00204                                                 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00205                                                 } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00206                                                 } catch (const std::bad_cast&) { };
00207 
00208                                                 try {
00209                                                         dynamic_cast<ProcessBundleTask&>(*t);
00210 
00211                                                         // new bundles are forwarded to all neighbors
00212                                                         const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00213 
00214                                                         for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00215                                                         {
00216                                                                 const dtn::core::Node &n = (*iter);
00217 
00218                                                                 // transfer the next bundle to this destination
00219                                                                 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00220                                                         }
00221                                                 } catch (const std::bad_cast&) { };
00222                                         } catch (const ibrcommon::Exception &ex) {
00223                                                 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in FloodRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00224                                         }
00225                                 } catch (const std::exception&) {
00226                                         return;
00227                                 }
00228 
00229                                 yield();
00230                         }
00231                 }
00232 
00233                 /****************************************/
00234 
00235                 FloodRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00236                  : eid(e)
00237                 { }
00238 
00239                 FloodRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00240                 { }
00241 
00242                 std::string FloodRoutingExtension::SearchNextBundleTask::toString()
00243                 {
00244                         return "SearchNextBundleTask: " + eid.getString();
00245                 }
00246 
00247                 /****************************************/
00248 
00249                 FloodRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o)
00250                  : bundle(meta), origin(o)
00251                 { }
00252 
00253                 FloodRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00254                 { }
00255 
00256                 std::string FloodRoutingExtension::ProcessBundleTask::toString()
00257                 {
00258                         return "ProcessBundleTask: " + bundle.toString();
00259                 }
00260         }
00261 }