IBR-DTNSuite 0.6

daemon/src/routing/flooding/FloodRoutingExtension.cpp

Go to the documentation of this file.
00001 /*
00002  * FloodRoutingExtension.cpp
00003  *
00004  *  Created on: 18.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "routing/flooding/FloodRoutingExtension.h"
00009 #include "routing/QueueBundleEvent.h"
00010 #include "core/NodeEvent.h"
00011 #include "net/TransferCompletedEvent.h"
00012 #include "net/TransferAbortedEvent.h"
00013 #include "core/Node.h"
00014 #include "net/ConnectionManager.h"
00015 #include "Configuration.h"
00016 #include "core/BundleCore.h"
00017 
00018 #include <ibrdtn/data/MetaBundle.h>
00019 #include <ibrcommon/thread/MutexLock.h>
00020 #include <ibrcommon/Logger.h>
00021 #include <ibrcommon/AutoDelete.h>
00022 
00023 #include <functional>
00024 #include <list>
00025 #include <algorithm>
00026 #include <iomanip>
00027 #include <ios>
00028 #include <iostream>
00029 
00030 #include <stdlib.h>
00031 #include <typeinfo>
00032 
00033 namespace dtn
00034 {
00035         namespace routing
00036         {
00037                 FloodRoutingExtension::FloodRoutingExtension()
00038                 {
00039                         // write something to the syslog
00040                         IBRCOMMON_LOGGER(info) << "Initializing flooding routing module" << IBRCOMMON_LOGGER_ENDL;
00041                 }
00042 
00043                 FloodRoutingExtension::~FloodRoutingExtension()
00044                 {
00045                         stop();
00046                         join();
00047                 }
00048 
00049                 void FloodRoutingExtension::stopExtension()
00050                 {
00051                         _taskqueue.abort();
00052                 }
00053 
00054                 void FloodRoutingExtension::notify(const dtn::core::Event *evt)
00055                 {
00056                         try {
00057                                 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00058                                 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) );
00059                                 return;
00060                         } catch (const std::bad_cast&) { };
00061 
00062                         try {
00063                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00064                                 const dtn::core::Node &n = nodeevent.getNode();
00065 
00066                                 if (nodeevent.getAction() == NODE_AVAILABLE)
00067                                 {
00068                                         const dtn::data::EID &eid = n.getEID();
00069 
00070                                         // send all (multi-hop) bundles in the storage to the neighbor
00071                                         _taskqueue.push( new SearchNextBundleTask(eid) );
00072                                 }
00073                                 return;
00074                         } catch (const std::bad_cast&) { };
00075 
00076                         // The bundle transfer has been aborted
00077                         try {
00078                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00079 
00080                                 // transfer the next bundle to this destination
00081                                 _taskqueue.push( new SearchNextBundleTask( aborted.getPeer() ) );
00082                                 return;
00083                         } catch (const std::bad_cast&) { };
00084 
00085                         // A bundle transfer was successful
00086                         try {
00087                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00088 
00089                                 // transfer the next bundle to this destination
00090                                 _taskqueue.push( new SearchNextBundleTask( completed.getPeer() ) );
00091                                 return;
00092                         } catch (const std::bad_cast&) { };
00093                 }
00094 
00095                 bool FloodRoutingExtension::__cancellation()
00096                 {
00097                         _taskqueue.abort();
00098                         return true;
00099                 }
00100 
00101                 void FloodRoutingExtension::run()
00102                 {
00103                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00104                         {
00105                         public:
00106                                 BundleFilter(const NeighborDatabase::NeighborEntry &entry)
00107                                  : _entry(entry)
00108                                 {};
00109 
00110                                 virtual ~BundleFilter() {};
00111 
00112                                 virtual size_t limit() const { return 10; };
00113 
00114                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00115                                 {
00116                                         // do not forward to any blacklisted destination
00117                                         const dtn::data::EID dest = meta.destination.getNode();
00118                                         if (_blacklist.find(dest) != _blacklist.end())
00119                                         {
00120                                                 return false;
00121                                         }
00122 
00123                                         // do not forward bundles already known by the destination
00124                                         if (_entry.has(meta))
00125                                         {
00126                                                 return false;
00127                                         }
00128 
00129                                         return true;
00130                                 };
00131 
00132                                 void blacklist(const dtn::data::EID& id)
00133                                 {
00134                                         _blacklist.insert(id);
00135                                 };
00136 
00137                         private:
00138                                 std::set<dtn::data::EID> _blacklist;
00139                                 const NeighborDatabase::NeighborEntry &_entry;
00140                         };
00141 
00142                         dtn::core::BundleStorage &storage = (**this).getStorage();
00143 
00144                         while (true)
00145                         {
00146                                 try {
00147                                         Task *t = _taskqueue.getnpop(true);
00148                                         ibrcommon::AutoDelete<Task> killer(t);
00149 
00150                                         IBRCOMMON_LOGGER_DEBUG(50) << "processing flooding task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00151 
00152                                         try {
00153                                                 try {
00154                                                         SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00155                                                         NeighborDatabase &db = (**this).getNeighborDB();
00156 
00157                                                         ibrcommon::MutexLock l(db);
00158                                                         NeighborDatabase::NeighborEntry &entry = db.get(task.eid);
00159 
00160                                                         // get the bundle filter of the neighbor
00161                                                         BundleFilter filter(entry);
00162 
00163                                                         // some debug
00164                                                         IBRCOMMON_LOGGER_DEBUG(40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00165 
00166                                                         // blacklist the neighbor itself, because this is handles by neighbor routing extension
00167                                                         filter.blacklist(task.eid);
00168 
00169                                                         // query all bundles from the storage
00170                                                         const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00171 
00172                                                         // send the bundles as long as we have resources
00173                                                         for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00174                                                         {
00175                                                                 try {
00176                                                                         // transfer the bundle to the neighbor
00177                                                                         transferTo(entry, *iter);
00178                                                                 } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00179                                                         }
00180                                                 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) {
00181                                                 } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00182                                                 } catch (const std::bad_cast&) { };
00183 
00184                                                 try {
00185                                                         dynamic_cast<ProcessBundleTask&>(*t);
00186 
00187                                                         // new bundles are forwarded to all neighbors
00188                                                         const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00189 
00190                                                         for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00191                                                         {
00192                                                                 const dtn::core::Node &n = (*iter);
00193 
00194                                                                 // transfer the next bundle to this destination
00195                                                                 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00196                                                         }
00197                                                 } catch (const std::bad_cast&) { };
00198                                         } catch (const ibrcommon::Exception &ex) {
00199                                                 IBRCOMMON_LOGGER_DEBUG(20) << "Exception occurred in FloodRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00200                                         }
00201                                 } catch (const std::exception&) {
00202                                         return;
00203                                 }
00204 
00205                                 yield();
00206                         }
00207                 }
00208 
00209                 /****************************************/
00210 
00211                 FloodRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00212                  : eid(e)
00213                 { }
00214 
00215                 FloodRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00216                 { }
00217 
00218                 std::string FloodRoutingExtension::SearchNextBundleTask::toString()
00219                 {
00220                         return "SearchNextBundleTask: " + eid.getString();
00221                 }
00222 
00223                 /****************************************/
00224 
00225                 FloodRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o)
00226                  : bundle(meta), origin(o)
00227                 { }
00228 
00229                 FloodRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00230                 { }
00231 
00232                 std::string FloodRoutingExtension::ProcessBundleTask::toString()
00233                 {
00234                         return "ProcessBundleTask: " + bundle.toString();
00235                 }
00236         }
00237 }