IBR-DTNSuite 0.6

daemon/src/routing/NeighborRoutingExtension.cpp

Go to the documentation of this file.
00001 /*
00002  * NeighborRoutingExtension.cpp
00003  *
00004  *  Created on: 16.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "routing/NeighborRoutingExtension.h"
00010 #include "routing/QueueBundleEvent.h"
00011 #include "core/TimeEvent.h"
00012 #include "net/TransferCompletedEvent.h"
00013 #include "net/TransferAbortedEvent.h"
00014 #include "core/BundleExpiredEvent.h"
00015 #include "core/NodeEvent.h"
00016 #include "core/Node.h"
00017 #include "net/ConnectionManager.h"
00018 #include "ibrcommon/thread/MutexLock.h"
00019 #include "core/SimpleBundleStorage.h"
00020 #include "core/BundleEvent.h"
00021 #include <ibrcommon/Logger.h>
00022 #include <ibrcommon/AutoDelete.h>
00023 
00024 #ifdef HAVE_SQLITE
00025 #include "core/SQLiteBundleStorage.h"
00026 #endif
00027 
00028 #include <functional>
00029 #include <list>
00030 #include <algorithm>
00031 #include <typeinfo>
00032 
00033 namespace dtn
00034 {
00035         namespace routing
00036         {
00037                 NeighborRoutingExtension::NeighborRoutingExtension()
00038                 {
00039                 }
00040 
00041                 NeighborRoutingExtension::~NeighborRoutingExtension()
00042                 {
00043                         stop();
00044                         join();
00045                 }
00046 
00047                 void NeighborRoutingExtension::stopExtension()
00048                 {
00049                         _taskqueue.abort();
00050                 }
00051 
00052                 bool NeighborRoutingExtension::__cancellation()
00053                 {
00054                         _taskqueue.abort();
00055                         return true;
00056                 }
00057 
00058                 void NeighborRoutingExtension::run()
00059                 {
00060 #ifdef HAVE_SQLITE
00061                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery
00062 #else
00063                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00064 #endif
00065                         {
00066                         public:
00067                                 BundleFilter(const dtn::data::EID &destination)
00068                                  : _destination(destination)
00069                                 {};
00070 
00071                                 virtual ~BundleFilter() {};
00072 
00073                                 virtual size_t limit() const { return 10; };
00074 
00075                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00076                                 {
00077                                         if (_destination.getNode() != meta.destination.getNode())
00078                                         {
00079                                                 return false;
00080                                         }
00081 
00082                                         return true;
00083                                 };
00084 
00085 #ifdef HAVE_SQLITE
00086                                 const std::string getWhere() const
00087                                 {
00088                                         return "destination LIKE ?";
00089                                 };
00090 
00091                                 size_t bind(sqlite3_stmt *st, size_t offset) const
00092                                 {
00093                                         const std::string d = _destination.getNode().getString() + "/%";
00094                                         sqlite3_bind_text(st, offset, d.c_str(), d.size(), SQLITE_TRANSIENT);
00095                                         return offset + 1;
00096                                 }
00097 #endif
00098 
00099                         private:
00100                                 const dtn::data::EID &_destination;
00101                         };
00102 
00103                         dtn::core::BundleStorage &storage = (**this).getStorage();
00104 
00105                         while (true)
00106                         {
00107                                 try {
00108                                         Task *t = _taskqueue.getnpop(true);
00109                                         ibrcommon::AutoDelete<Task> killer(t);
00110 
00111                                         IBRCOMMON_LOGGER_DEBUG(5) << "processing neighbor routing task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00112 
00118                                         try {
00119                                                 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00120 
00121                                                 try {
00122                                                         // create a new bundle filter
00123                                                         BundleFilter filter(task.eid);
00124 
00125                                                         // query an unknown bundle from the storage, the list contains max. 10 items.
00126                                                         const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00127 
00128                                                         IBRCOMMON_LOGGER_DEBUG(5) << "got " << list.size() << " items to transfer to " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00129 
00130                                                         // send the bundles as long as we have resources
00131                                                         for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00132                                                         {
00133                                                                 try {
00134                                                                         // transfer the bundle to the neighbor
00135                                                                         transferTo(task.eid, *iter);
00136                                                                 } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00137                                                         }
00138                                                 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { };
00139                                         } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00140                                         } catch (const std::bad_cast&) { };
00141 
00145                                         try {
00146                                                 dynamic_cast<ProcessBundleTask&>(*t);
00147 
00148                                                 // new bundles trigger a recheck for all neighbors
00149                                                 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00150 
00151                                                 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00152                                                 {
00153                                                         const dtn::core::Node &n = (*iter);
00154 
00155                                                         // transfer the next bundle to this destination
00156                                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00157                                                 }
00158                                         } catch (const std::bad_cast&) { };
00159 
00160                                 } catch (const std::exception &ex) {
00161                                         IBRCOMMON_LOGGER_DEBUG(20) << "neighbor routing failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00162                                         return;
00163                                 }
00164 
00165                                 yield();
00166                         }
00167                 }
00168 
00169                 void NeighborRoutingExtension::notify(const dtn::core::Event *evt)
00170                 {
00171                         try {
00172                                 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00173                                 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) );
00174                                 return;
00175                         } catch (const std::bad_cast&) { };
00176 
00177                         try {
00178                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00179                                 const dtn::data::MetaBundle &meta = completed.getBundle();
00180                                 const dtn::data::EID &peer = completed.getPeer();
00181 
00182                                 if ((meta.destination.getNode() == peer.getNode())
00183                                                 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00184                                 {
00185                                         try {
00186                                                 dtn::core::BundleStorage &storage = (**this).getStorage();
00187 
00188                                                 // bundle has been delivered to its destination
00189                                                 // delete it from our storage
00190                                                 storage.remove(meta);
00191 
00192                                                 IBRCOMMON_LOGGER(notice) << "singleton bundle delivered and removed: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00193 
00194                                                 // gen a report
00195                                                 dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE);
00196                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00197 
00198                                         // transfer the next bundle to this destination
00199                                         _taskqueue.push( new SearchNextBundleTask( peer ) );
00200                                 }
00201                                 return;
00202                         } catch (const std::bad_cast&) { };
00203 
00204                         try {
00205                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00206                                 const dtn::data::EID &peer = aborted.getPeer();
00207                                 const dtn::data::BundleID &id = aborted.getBundleID();
00208 
00209                                 switch (aborted.reason)
00210                                 {
00211                                         case dtn::net::TransferAbortedEvent::REASON_UNDEFINED:
00212                                                 break;
00213 
00214                                         case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED:
00215                                                 break;
00216 
00217                                         case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED:
00218                                                 break;
00219 
00220                                         case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN:
00221                                                 return;
00222 
00223                                         case dtn::net::TransferAbortedEvent::REASON_REFUSED:
00224                                         {
00225                                                 try {
00226                                                         const dtn::data::MetaBundle meta = (**this).getStorage().get(id);
00227 
00228                                                         // if the bundle has been sent by this module delete it
00229                                                         if ((meta.destination.getNode() == peer.getNode())
00230                                                                         && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00231                                                         {
00232                                                                 // bundle is not deliverable
00233                                                                 (**this).getStorage().remove(id);
00234                                                         }
00235                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00236                                         }
00237                                         break;
00238                                 }
00239 
00240                                 // transfer the next bundle to this destination
00241                                 _taskqueue.push( new SearchNextBundleTask( peer ) );
00242 
00243                                 return;
00244                         } catch (const std::bad_cast&) { };
00245 
00246                         // If a new neighbor comes available, send him a request for the summary vector
00247                         // If a neighbor went away we can free the stored summary vector
00248                         try {
00249                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00250                                 const dtn::core::Node &n = nodeevent.getNode();
00251 
00252                                 if (nodeevent.getAction() == NODE_AVAILABLE)
00253                                 {
00254                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00255                                 }
00256 
00257                                 return;
00258                         } catch (const std::bad_cast&) { };
00259                 }
00260 
00261                 /****************************************/
00262 
00263                 NeighborRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00264                  : eid(e)
00265                 { }
00266 
00267                 NeighborRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00268                 { }
00269 
00270                 std::string NeighborRoutingExtension::SearchNextBundleTask::toString()
00271                 {
00272                         return "SearchNextBundleTask: " + eid.getString();
00273                 }
00274 
00275                 /****************************************/
00276 
00277                 NeighborRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o)
00278                  : bundle(meta), origin(o)
00279                 { }
00280 
00281                 NeighborRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00282                 { }
00283 
00284                 std::string NeighborRoutingExtension::ProcessBundleTask::toString()
00285                 {
00286                         return "ProcessBundleTask: " + bundle.toString();
00287                 }
00288         }
00289 }