IBR-DTNSuite 0.6

daemon/src/routing/NeighborRoutingExtension.cpp

Go to the documentation of this file.
00001 /*
00002  * NeighborRoutingExtension.cpp
00003  *
00004  *  Created on: 16.02.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "routing/NeighborRoutingExtension.h"
00010 #include "routing/QueueBundleEvent.h"
00011 #include "core/TimeEvent.h"
00012 #include "net/TransferCompletedEvent.h"
00013 #include "net/TransferAbortedEvent.h"
00014 #include "net/ConnectionEvent.h"
00015 #include "core/BundleExpiredEvent.h"
00016 #include "core/NodeEvent.h"
00017 #include "core/Node.h"
00018 #include "net/ConnectionManager.h"
00019 #include "ibrcommon/thread/MutexLock.h"
00020 #include "core/SimpleBundleStorage.h"
00021 #include "core/BundleEvent.h"
00022 #include <ibrcommon/Logger.h>
00023 #include <ibrcommon/AutoDelete.h>
00024 
00025 #ifdef HAVE_SQLITE
00026 #include "core/SQLiteBundleStorage.h"
00027 #endif
00028 
00029 #include <functional>
00030 #include <list>
00031 #include <algorithm>
00032 #include <typeinfo>
00033 
00034 namespace dtn
00035 {
00036         namespace routing
00037         {
00038                 NeighborRoutingExtension::NeighborRoutingExtension()
00039                 {
00040                 }
00041 
00042                 NeighborRoutingExtension::~NeighborRoutingExtension()
00043                 {
00044                         stop();
00045                         join();
00046                 }
00047 
00048                 void NeighborRoutingExtension::stopExtension()
00049                 {
00050                         _taskqueue.abort();
00051                 }
00052 
00053                 bool NeighborRoutingExtension::__cancellation()
00054                 {
00055                         _taskqueue.abort();
00056                         return true;
00057                 }
00058 
00059                 void NeighborRoutingExtension::run()
00060                 {
00061 #ifdef HAVE_SQLITE
00062                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery
00063 #else
00064                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00065 #endif
00066                         {
00067                         public:
00068                                 BundleFilter(const dtn::data::EID &destination)
00069                                  : _destination(destination)
00070                                 {};
00071 
00072                                 virtual ~BundleFilter() {};
00073 
00074                                 virtual size_t limit() const { return 10; };
00075 
00076                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00077                                 {
00078                                         if (_destination.getNode() != meta.destination.getNode())
00079                                         {
00080                                                 return false;
00081                                         }
00082 
00083                                         return true;
00084                                 };
00085 
00086 #ifdef HAVE_SQLITE
00087                                 const std::string getWhere() const
00088                                 {
00089                                         return "destination LIKE ?";
00090                                 };
00091 
00092                                 size_t bind(sqlite3_stmt *st, size_t offset) const
00093                                 {
00094                                         const std::string d = _destination.getNode().getString() + "/%";
00095                                         sqlite3_bind_text(st, offset, d.c_str(), d.size(), SQLITE_TRANSIENT);
00096                                         return offset + 1;
00097                                 }
00098 #endif
00099 
00100                         private:
00101                                 const dtn::data::EID &_destination;
00102                         };
00103 
00104                         dtn::core::BundleStorage &storage = (**this).getStorage();
00105 
00106                         while (true)
00107                         {
00108                                 try {
00109                                         Task *t = _taskqueue.getnpop(true);
00110                                         ibrcommon::AutoDelete<Task> killer(t);
00111 
00112                                         IBRCOMMON_LOGGER_DEBUG(5) << "processing neighbor routing task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00113 
00119                                         try {
00120                                                 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00121 
00122                                                 try {
00123                                                         // create a new bundle filter
00124                                                         BundleFilter filter(task.eid);
00125 
00126                                                         // query an unknown bundle from the storage, the list contains max. 10 items.
00127                                                         const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00128 
00129                                                         IBRCOMMON_LOGGER_DEBUG(5) << "got " << list.size() << " items to transfer to " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00130 
00131                                                         // send the bundles as long as we have resources
00132                                                         for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00133                                                         {
00134                                                                 try {
00135                                                                         // transfer the bundle to the neighbor
00136                                                                         transferTo(task.eid, *iter);
00137                                                                 } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00138                                                         }
00139                                                 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { };
00140                                         } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00141                                         } catch (const std::bad_cast&) { };
00142 
00146                                         try {
00147                                                 dynamic_cast<ProcessBundleTask&>(*t);
00148 
00149                                                 // new bundles trigger a recheck for all neighbors
00150                                                 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00151 
00152                                                 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00153                                                 {
00154                                                         const dtn::core::Node &n = (*iter);
00155 
00156                                                         // transfer the next bundle to this destination
00157                                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00158                                                 }
00159                                         } catch (const std::bad_cast&) { };
00160 
00161                                 } catch (const std::exception &ex) {
00162                                         IBRCOMMON_LOGGER_DEBUG(20) << "neighbor routing failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00163                                         return;
00164                                 }
00165 
00166                                 yield();
00167                         }
00168                 }
00169 
00170                 void NeighborRoutingExtension::notify(const dtn::core::Event *evt)
00171                 {
00172                         try {
00173                                 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00174                                 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) );
00175                                 return;
00176                         } catch (const std::bad_cast&) { };
00177 
00178                         try {
00179                                 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00180                                 const dtn::data::MetaBundle &meta = completed.getBundle();
00181                                 const dtn::data::EID &peer = completed.getPeer();
00182 
00183                                 if ((meta.destination.getNode() == peer.getNode())
00184                                                 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00185                                 {
00186                                         try {
00187                                                 dtn::core::BundleStorage &storage = (**this).getStorage();
00188 
00189                                                 // bundle has been delivered to its destination
00190                                                 // delete it from our storage
00191                                                 storage.remove(meta);
00192 
00193                                                 IBRCOMMON_LOGGER(notice) << "singleton bundle delivered and removed: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00194 
00195                                                 // gen a report
00196                                                 dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE);
00197                                         } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00198 
00199                                         // transfer the next bundle to this destination
00200                                         _taskqueue.push( new SearchNextBundleTask( peer ) );
00201                                 }
00202                                 return;
00203                         } catch (const std::bad_cast&) { };
00204 
00205                         try {
00206                                 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00207                                 const dtn::data::EID &peer = aborted.getPeer();
00208                                 const dtn::data::BundleID &id = aborted.getBundleID();
00209 
00210                                 switch (aborted.reason)
00211                                 {
00212                                         case dtn::net::TransferAbortedEvent::REASON_UNDEFINED:
00213                                                 break;
00214 
00215                                         case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED:
00216                                                 break;
00217 
00218                                         case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED:
00219                                                 break;
00220 
00221                                         case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN:
00222                                                 return;
00223 
00224                                         case dtn::net::TransferAbortedEvent::REASON_REFUSED:
00225                                         {
00226                                                 try {
00227                                                         const dtn::data::MetaBundle meta = (**this).getStorage().get(id);
00228 
00229                                                         // if the bundle has been sent by this module delete it
00230                                                         if ((meta.destination.getNode() == peer.getNode())
00231                                                                         && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00232                                                         {
00233                                                                 // bundle is not deliverable
00234                                                                 (**this).getStorage().remove(id);
00235                                                         }
00236                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00237                                         }
00238                                         break;
00239                                 }
00240 
00241                                 // transfer the next bundle to this destination
00242                                 _taskqueue.push( new SearchNextBundleTask( peer ) );
00243 
00244                                 return;
00245                         } catch (const std::bad_cast&) { };
00246 
00247                         // If a new neighbor comes available, send him a request for the summary vector
00248                         // If a neighbor went away we can free the stored summary vector
00249                         try {
00250                                 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00251                                 const dtn::core::Node &n = nodeevent.getNode();
00252 
00253                                 if (nodeevent.getAction() == NODE_AVAILABLE)
00254                                 {
00255                                         _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00256                                 }
00257 
00258                                 return;
00259                         } catch (const std::bad_cast&) { };
00260 
00261                         try {
00262                                 const dtn::net::ConnectionEvent &ce = dynamic_cast<const dtn::net::ConnectionEvent&>(*evt);
00263 
00264                                 if (ce.state == dtn::net::ConnectionEvent::CONNECTION_UP)
00265                                 {
00266                                         // send all (multi-hop) bundles in the storage to the neighbor
00267                                         _taskqueue.push( new SearchNextBundleTask(ce.peer) );
00268                                 }
00269                                 return;
00270                         } catch (const std::bad_cast&) { };
00271                 }
00272 
00273                 /****************************************/
00274 
00275                 NeighborRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00276                  : eid(e)
00277                 { }
00278 
00279                 NeighborRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00280                 { }
00281 
00282                 std::string NeighborRoutingExtension::SearchNextBundleTask::toString()
00283                 {
00284                         return "SearchNextBundleTask: " + eid.getString();
00285                 }
00286 
00287                 /****************************************/
00288 
00289                 NeighborRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o)
00290                  : bundle(meta), origin(o)
00291                 { }
00292 
00293                 NeighborRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00294                 { }
00295 
00296                 std::string NeighborRoutingExtension::ProcessBundleTask::toString()
00297                 {
00298                         return "ProcessBundleTask: " + bundle.toString();
00299                 }
00300         }
00301 }