Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "config.h"
00009 #include "routing/NeighborRoutingExtension.h"
00010 #include "routing/QueueBundleEvent.h"
00011 #include "core/TimeEvent.h"
00012 #include "net/TransferCompletedEvent.h"
00013 #include "net/TransferAbortedEvent.h"
00014 #include "core/BundleExpiredEvent.h"
00015 #include "core/NodeEvent.h"
00016 #include "core/Node.h"
00017 #include "net/ConnectionManager.h"
00018 #include "ibrcommon/thread/MutexLock.h"
00019 #include "core/SimpleBundleStorage.h"
00020 #include <ibrcommon/Logger.h>
00021 #include <ibrcommon/AutoDelete.h>
00022
00023 #ifdef HAVE_SQLITE
00024 #include "core/SQLiteBundleStorage.h"
00025 #endif
00026
00027 #include <functional>
00028 #include <list>
00029 #include <algorithm>
00030 #include <typeinfo>
00031
00032 namespace dtn
00033 {
00034 namespace routing
00035 {
00036 NeighborRoutingExtension::NeighborRoutingExtension()
00037 {
00038 }
00039
00040 NeighborRoutingExtension::~NeighborRoutingExtension()
00041 {
00042 stop();
00043 join();
00044 }
00045
00046 void NeighborRoutingExtension::stopExtension()
00047 {
00048 _taskqueue.abort();
00049 }
00050
00051 bool NeighborRoutingExtension::__cancellation()
00052 {
00053 _taskqueue.abort();
00054 return true;
00055 }
00056
00057 void NeighborRoutingExtension::run()
00058 {
00059 #ifdef HAVE_SQLITE
00060 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery
00061 #else
00062 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00063 #endif
00064 {
00065 public:
00066 BundleFilter(const dtn::data::EID &destination)
00067 : _destination(destination)
00068 {};
00069
00070 virtual ~BundleFilter() {};
00071
00072 virtual size_t limit() const { return 10; };
00073
00074 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00075 {
00076 if (_destination.getNodeEID() != meta.destination.getNodeEID())
00077 {
00078 return false;
00079 }
00080
00081 return true;
00082 };
00083
00084 #ifdef HAVE_SQLITE
00085 const std::string getWhere() const
00086 {
00087 return "destination LIKE ?";
00088 };
00089
00090 size_t bind(sqlite3_stmt *st, size_t offset) const
00091 {
00092 const std::string d = _destination.getNodeEID() + "/%";
00093 sqlite3_bind_text(st, offset, d.c_str(), d.size(), SQLITE_TRANSIENT);
00094 return offset + 1;
00095 }
00096 #endif
00097
00098 private:
00099 const dtn::data::EID &_destination;
00100 };
00101
00102 dtn::core::BundleStorage &storage = (**this).getStorage();
00103
00104 while (true)
00105 {
00106 try {
00107 Task *t = _taskqueue.getnpop(true);
00108 ibrcommon::AutoDelete<Task> killer(t);
00109
00110 IBRCOMMON_LOGGER_DEBUG(5) << "processing neighbor routing task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
00111
00117 try {
00118 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00119
00120 try {
00121
00122 BundleFilter filter(task.eid);
00123
00124
00125 const std::list<dtn::data::MetaBundle> list = storage.get(filter);
00126
00127 IBRCOMMON_LOGGER_DEBUG(5) << "got " << list.size() << " items to transfer to " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
00128
00129
00130 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00131 {
00132 try {
00133
00134 transferTo(task.eid, *iter);
00135 } catch (const NeighborDatabase::AlreadyInTransitException&) { };
00136 }
00137 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { };
00138 } catch (const NeighborDatabase::NeighborNotAvailableException&) {
00139 } catch (const std::bad_cast&) { };
00140
00144 try {
00145 dynamic_cast<ProcessBundleTask&>(*t);
00146
00147
00148 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors();
00149
00150 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++)
00151 {
00152 const dtn::core::Node &n = (*iter);
00153
00154
00155 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00156 }
00157 } catch (const std::bad_cast&) { };
00158
00159 } catch (const std::exception &ex) {
00160 IBRCOMMON_LOGGER_DEBUG(20) << "neighbor routing failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00161 return;
00162 }
00163
00164 yield();
00165 }
00166 }
00167
00168 void NeighborRoutingExtension::notify(const dtn::core::Event *evt)
00169 {
00170 try {
00171 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00172 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) );
00173 return;
00174 } catch (const std::bad_cast&) { };
00175
00176 try {
00177 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00178 const dtn::data::MetaBundle &meta = completed.getBundle();
00179 const dtn::data::EID &peer = completed.getPeer();
00180
00181 if ((meta.destination.getNodeEID() == peer.getNodeEID())
00182 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00183 {
00184 try {
00185 dtn::core::BundleStorage &storage = (**this).getStorage();
00186
00187
00188
00189 storage.remove(meta);
00190
00191 IBRCOMMON_LOGGER_DEBUG(15) << "singleton bundle delivered: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00192 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00193
00194
00195 _taskqueue.push( new SearchNextBundleTask( peer ) );
00196 }
00197 return;
00198 } catch (const std::bad_cast&) { };
00199
00200 try {
00201 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00202 const dtn::data::EID &peer = aborted.getPeer();
00203 const dtn::data::BundleID &id = aborted.getBundleID();
00204
00205 switch (aborted.reason)
00206 {
00207 case dtn::net::TransferAbortedEvent::REASON_UNDEFINED:
00208 break;
00209
00210 case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED:
00211 break;
00212
00213 case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED:
00214 break;
00215
00216 case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN:
00217 return;
00218
00219 case dtn::net::TransferAbortedEvent::REASON_REFUSED:
00220 {
00221 try {
00222 const dtn::data::MetaBundle meta = (**this).getStorage().get(id);
00223
00224
00225 if ((meta.destination.getNodeEID() == peer.getNodeEID())
00226 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON))
00227 {
00228
00229 (**this).getStorage().remove(id);
00230 }
00231 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { };
00232 }
00233 break;
00234 }
00235
00236
00237 _taskqueue.push( new SearchNextBundleTask( peer ) );
00238
00239 return;
00240 } catch (const std::bad_cast&) { };
00241
00242
00243
00244 try {
00245 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00246 const dtn::core::Node &n = nodeevent.getNode();
00247
00248 if (nodeevent.getAction() == NODE_AVAILABLE)
00249 {
00250 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) );
00251 }
00252
00253 return;
00254 } catch (const std::bad_cast&) { };
00255 }
00256
00257
00258
00259 NeighborRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00260 : eid(e)
00261 { }
00262
00263 NeighborRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00264 { }
00265
00266 std::string NeighborRoutingExtension::SearchNextBundleTask::toString()
00267 {
00268 return "SearchNextBundleTask: " + eid.getString();
00269 }
00270
00271
00272
00273 NeighborRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o)
00274 : bundle(meta), origin(o)
00275 { }
00276
00277 NeighborRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00278 { }
00279
00280 std::string NeighborRoutingExtension::ProcessBundleTask::toString()
00281 {
00282 return "ProcessBundleTask: " + bundle.toString();
00283 }
00284 }
00285 }