• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/routing/FloodRoutingExtension.cpp

Go to the documentation of this file.
00001 /*
00002  * FloodRoutingExtension.cpp
00003  *
00004  *  Created on: 18.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "routing/FloodRoutingExtension.h"
00009 #include "routing/QueueBundleEvent.h"
00010 #include "net/TransferCompletedEvent.h"
00011 #include "net/TransferAbortedEvent.h"
00012 #include "core/BundleExpiredEvent.h"
00013 #include "core/NodeEvent.h"
00014 #include "core/TimeEvent.h"
00015 #include "core/Node.h"
00016 #include "net/ConnectionManager.h"
00017 #include "Configuration.h"
00018 #include "core/BundleCore.h"
00019 #include "core/SimpleBundleStorage.h"
00020 
00021 #include <ibrdtn/data/MetaBundle.h>
00022 #include <ibrcommon/thread/MutexLock.h>
00023 #include <ibrcommon/Logger.h>
00024 #include <ibrcommon/AutoDelete.h>
00025 
00026 #include <functional>
00027 #include <list>
00028 #include <algorithm>
00029 
00030 #include <iomanip>
00031 #include <ios>
00032 #include <iostream>
00033 
00034 #include <stdlib.h>
00035 #include <typeinfo>
00036 
00037 namespace dtn
00038 {
00039         namespace routing
00040         {
00041                 struct FindNode: public std::binary_function< dtn::core::Node, dtn::core::Node, bool > {
00042                         bool operator () ( const dtn::core::Node &n1, const dtn::core::Node &n2 ) const {
00043                                 return n1 == n2;
00044                         }
00045                 };
00046 
00047                 FloodRoutingExtension::FloodRoutingExtension()
00048                 {
00049                         // get configuration
00050                         dtn::daemon::Configuration &conf = dtn::daemon::Configuration::getInstance();
00051 
00052                         // write something to the syslog
00053                         IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module for node " << conf.getNodename() << IBRCOMMON_LOGGER_ENDL;
00054                 }
00055 
00056                 FloodRoutingExtension::~FloodRoutingExtension()
00057                 {
00058                         stop();
00059                         join();
00060                 }
00061 
00062                 void FloodRoutingExtension::stopExtension()
00063                 {
00064                         _taskqueue.abort();
00065                 }
00066 
00067                 void FloodRoutingExtension::notify(const dtn::core::Event *evt)
00068                 {
00069                         try {
00070                                 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00071                                 _taskqueue.push( new ProcessBundleTask(queued.bundle) );
00072                         } catch (std::bad_cast ex) { };
00073 
00074                         try {
00075                                 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00076 
00077                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00078                                 {
00079                                         // check all lists for expired entries
00080                                         _taskqueue.push( new ExpireTask(time.getTimestamp()) );
00081                                 }
00082                         } catch (std::bad_cast ex) { };
00083 
00084                         try {
00085                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00086 
00087                                 dtn::core::Node n = nodeevent.getNode();
00088                                 dtn::data::EID eid(n.getURI());
00089 
00090                                 switch (nodeevent.getAction())
00091                                 {
00092                                         case NODE_AVAILABLE:
00093                                         {
00094                                                 ibrcommon::MutexLock l(_list_mutex);
00095                                                 _neighbors.setAvailable(eid);
00096 
00097                                                 _taskqueue.push( new SearchNextBundleTask( eid ) );
00098                                         }
00099                                         break;
00100 
00101                                         case NODE_UNAVAILABLE:
00102                                         {
00103                                                 ibrcommon::MutexLock l(_list_mutex);
00104                                                 _neighbors.setUnavailable(eid);
00105                                         }
00106                                         break;
00107 
00108                                         default:
00109                                                 break;
00110                                 }
00111                         } catch (std::bad_cast ex) { };
00112 
00113                         // TransferAbortedEvent: send next bundle
00114                         try {
00115                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00116                                 dtn::data::EID eid = aborted.getPeer();
00117                                 dtn::data::BundleID id = aborted.getBundleID();
00118 
00119                                 switch (aborted.reason)
00120                                 {
00121                                         case dtn::net::TransferAbortedEvent::REASON_UNDEFINED:
00122                                         {
00123                                                 break;
00124                                         }
00125 
00126                                         case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED:
00127                                         {
00128                                                 // transfer the next bundle to this destination
00129                                                 _taskqueue.push( new SearchNextBundleTask( eid ) );
00130                                                 break;
00131                                         }
00132 
00133                                         case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN:
00134                                         {
00135                                                 break;
00136                                         }
00137 
00138                                         // How to exclude several bundles? Combine two bloomfilters? Use a blocklist. Delay transfers to the node?
00139                                         case dtn::net::TransferAbortedEvent::REASON_REFUSED:
00140                                         {
00141                                                 // add the transferred bundle to the bloomfilter of the receiver
00142                                                 ibrcommon::MutexLock l(_list_mutex);
00143                                                 ibrcommon::BloomFilter &bf = _neighbors.get(eid)._filter;
00144                                                 bf.insert(id.toString());
00145 
00146                                                 if (IBRCOMMON_LOGGER_LEVEL >= 40)
00147                                                 {
00148                                                         IBRCOMMON_LOGGER_DEBUG(40) << "bloomfilter false-positive propability is " << bf.getAllocation() << IBRCOMMON_LOGGER_ENDL;
00149                                                 }
00150 
00151                                                 // transfer the next bundle to this destination
00152                                                 _taskqueue.push( new SearchNextBundleTask( eid ) );
00153                                                 break;
00154                                         }
00155 
00156                                         case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED:
00157                                         {
00158                                                 // transfer the next bundle to this destination
00159                                                 _taskqueue.push( new SearchNextBundleTask( eid ) );
00160                                                 break;
00161                                         }
00162                                 }
00163                         } catch (std::bad_cast ex) { };
00164 
00165                         try {
00166                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00167 
00168                                 dtn::data::EID eid = completed.getPeer();
00169                                 dtn::data::MetaBundle meta = completed.getBundle();
00170 
00171                                 // delete the bundle in the storage if
00172                                 if ( EID(eid.getNodeEID()) == EID(meta.destination.getNodeEID()) )
00173                                 {
00174                                         try {
00175                                                 // bundle has been delivered to its destination
00176                                                 // TODO: generate a "delete" message for routing algorithm
00177 
00178                                                 // delete it from our storage
00179                                                 getRouter()->getStorage().remove(meta);
00180 
00181                                                 IBRCOMMON_LOGGER_DEBUG(15) << "bundle delivered: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00182 
00183                                                 // add it to the purge vector
00184                                                 _purge_vector.add(meta);
00185                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00186 
00187                                         }
00188                                 }
00189                                 else
00190                                 {
00191                                         // add the transferred bundle to the bloomfilter of the receiver
00192                                         ibrcommon::MutexLock l(_list_mutex);
00193                                         ibrcommon::BloomFilter &bf = _neighbors.get(eid)._filter;
00194                                         bf.insert(meta.toString());
00195 
00196                                         if (IBRCOMMON_LOGGER_LEVEL >= 40)
00197                                         {
00198                                                 IBRCOMMON_LOGGER_DEBUG(40) << "bloomfilter false-positive propability is " << bf.getAllocation() << IBRCOMMON_LOGGER_ENDL;
00199                                         }
00200                                 }
00201 
00202                                 // transfer the next bundle to this destination
00203                                 _taskqueue.push( new SearchNextBundleTask( eid ) );
00204 
00205                         } catch (std::bad_cast ex) { };
00206                 }
00207 
00208                 bool FloodRoutingExtension::__cancellation()
00209                 {
00210                         _taskqueue.abort();
00211                         return true;
00212                 }
00213 
00214                 void FloodRoutingExtension::run()
00215                 {
00216                         while (true)
00217                         {
00218                                 try {
00219                                         Task *t = _taskqueue.getnpop(true);
00220                                         ibrcommon::AutoDelete<Task> killer(t);
00221 
00222                                         IBRCOMMON_LOGGER_DEBUG(50) << "processing flooding task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00223 
00224                                         try {
00225                                                 try {
00226                                                         ExpireTask &task = dynamic_cast<ExpireTask&>(*t);
00227 
00228                                                         ibrcommon::MutexLock l(_list_mutex);
00229                                                         _purge_vector.expire(task.timestamp);
00230                                                 } catch (std::bad_cast) { };
00231 
00232                                                 try {
00233                                                         SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00234 
00235                                                         IBRCOMMON_LOGGER_DEBUG(40) << "search one bundle not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00236 
00237                                                         ibrcommon::MutexLock l(_list_mutex);
00238                                                         ibrcommon::BloomFilter &bf = _neighbors.get(task.eid)._filter;
00239                                                         const dtn::data::BundleID b = getRouter()->getStorage().getByFilter(bf);
00240                                                         getRouter()->transferTo(task.eid, b);
00241 
00242                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00243                                                 } catch (std::bad_cast) { };
00244 
00245                                                 try {
00246                                                         dynamic_cast<ProcessBundleTask&>(*t);
00247                                                         ibrcommon::MutexLock l(_list_mutex);
00248 
00249                                                         // trigger transmission for all neighbors
00250                                                         std::set<dtn::data::EID> list;
00251                                                         {
00252                                                                 list = _neighbors.getAvailable();
00253                                                         }
00254 
00255                                                         for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00256                                                         {
00257                                                                 // transfer the next bundle to this destination
00258                                                                 _taskqueue.push( new SearchNextBundleTask( *iter ) );
00259                                                         }
00260                                                 } catch (dtn::data::Bundle::NoSuchBlockFoundException) {
00261                                                         // if the bundle does not contains the expected block
00262                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00263                                                         // if the bundle is not in the storage we have nothing to do
00264                                                 } catch (std::bad_cast) { };
00265                                         } catch (ibrcommon::Exception ex) {
00266                                                 IBRCOMMON_LOGGER(error) << "Exception occurred in FloodRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00267                                         }
00268                                 } catch (std::exception) {
00269                                         return;
00270                                 }
00271 
00272                                 yield();
00273                         }
00274                 }
00275 
00276                 /****************************************/
00277 
00278                 FloodRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00279                  : eid(e)
00280                 { }
00281 
00282                 FloodRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00283                 { }
00284 
00285                 std::string FloodRoutingExtension::SearchNextBundleTask::toString()
00286                 {
00287                         return "SearchNextBundleTask: " + eid.getString();
00288                 }
00289 
00290                 /****************************************/
00291 
00292                 FloodRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta)
00293                  : bundle(meta)
00294                 { }
00295 
00296                 FloodRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00297                 { }
00298 
00299                 std::string FloodRoutingExtension::ProcessBundleTask::toString()
00300                 {
00301                         return "ProcessBundleTask: " + bundle.toString();
00302                 }
00303 
00304                 /****************************************/
00305 
00306                 FloodRoutingExtension::ExpireTask::ExpireTask(const size_t t)
00307                  : timestamp(t)
00308                 { }
00309 
00310                 FloodRoutingExtension::ExpireTask::~ExpireTask()
00311                 { }
00312 
00313                 std::string FloodRoutingExtension::ExpireTask::toString()
00314                 {
00315                         return "ExpireTask";
00316                 }
00317         }
00318 }

Generated on Thu Nov 11 2010 09:49:47 for IBR-DTNSuite by  doxygen 1.7.1