|
IBR-DTNSuite 0.6
|
00001 /* 00002 * NeighborRoutingExtension.cpp 00003 * 00004 * Created on: 16.02.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "routing/NeighborRoutingExtension.h" 00010 #include "routing/QueueBundleEvent.h" 00011 #include "core/TimeEvent.h" 00012 #include "net/TransferCompletedEvent.h" 00013 #include "net/TransferAbortedEvent.h" 00014 #include "net/ConnectionEvent.h" 00015 #include "core/BundleExpiredEvent.h" 00016 #include "core/NodeEvent.h" 00017 #include "core/Node.h" 00018 #include "net/ConnectionManager.h" 00019 #include "ibrcommon/thread/MutexLock.h" 00020 #include "core/SimpleBundleStorage.h" 00021 #include "core/BundleEvent.h" 00022 #include <ibrcommon/Logger.h> 00023 #include <ibrcommon/AutoDelete.h> 00024 00025 #ifdef HAVE_SQLITE 00026 #include "core/SQLiteBundleStorage.h" 00027 #endif 00028 00029 #include <functional> 00030 #include <list> 00031 #include <algorithm> 00032 #include <typeinfo> 00033 00034 namespace dtn 00035 { 00036 namespace routing 00037 { 00038 NeighborRoutingExtension::NeighborRoutingExtension() 00039 { 00040 } 00041 00042 NeighborRoutingExtension::~NeighborRoutingExtension() 00043 { 00044 stop(); 00045 join(); 00046 } 00047 00048 void NeighborRoutingExtension::stopExtension() 00049 { 00050 _taskqueue.abort(); 00051 } 00052 00053 bool NeighborRoutingExtension::__cancellation() 00054 { 00055 _taskqueue.abort(); 00056 return true; 00057 } 00058 00059 void NeighborRoutingExtension::run() 00060 { 00061 #ifdef HAVE_SQLITE 00062 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery 00063 #else 00064 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback 00065 #endif 00066 { 00067 public: 00068 BundleFilter(const dtn::data::EID &destination) 00069 : _destination(destination) 00070 {}; 00071 00072 virtual ~BundleFilter() {}; 00073 00074 virtual size_t limit() const { return 10; }; 00075 00076 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00077 { 00078 if (_destination.getNode() != meta.destination.getNode()) 00079 { 00080 return false; 00081 } 00082 00083 return true; 00084 }; 00085 00086 #ifdef HAVE_SQLITE 00087 const std::string getWhere() const 00088 { 00089 return "destination LIKE ?"; 00090 }; 00091 00092 size_t bind(sqlite3_stmt *st, size_t offset) const 00093 { 00094 const std::string d = _destination.getNode().getString() + "/%"; 00095 sqlite3_bind_text(st, offset, d.c_str(), d.size(), SQLITE_TRANSIENT); 00096 return offset + 1; 00097 } 00098 #endif 00099 00100 private: 00101 const dtn::data::EID &_destination; 00102 }; 00103 00104 dtn::core::BundleStorage &storage = (**this).getStorage(); 00105 00106 while (true) 00107 { 00108 try { 00109 Task *t = _taskqueue.getnpop(true); 00110 ibrcommon::AutoDelete<Task> killer(t); 00111 00112 IBRCOMMON_LOGGER_DEBUG(5) << "processing neighbor routing task " << t->toString() << IBRCOMMON_LOGGER_ENDL; 00113 00119 try { 00120 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t); 00121 00122 try { 00123 // create a new bundle filter 00124 BundleFilter filter(task.eid); 00125 00126 // query an unknown bundle from the storage, the list contains max. 10 items. 00127 const std::list<dtn::data::MetaBundle> list = storage.get(filter); 00128 00129 IBRCOMMON_LOGGER_DEBUG(5) << "got " << list.size() << " items to transfer to " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL; 00130 00131 // send the bundles as long as we have resources 00132 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00133 { 00134 try { 00135 // transfer the bundle to the neighbor 00136 transferTo(task.eid, *iter); 00137 } catch (const NeighborDatabase::AlreadyInTransitException&) { }; 00138 } 00139 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { }; 00140 } catch (const NeighborDatabase::NeighborNotAvailableException&) { 00141 } catch (const std::bad_cast&) { }; 00142 00146 try { 00147 dynamic_cast<ProcessBundleTask&>(*t); 00148 00149 // new bundles trigger a recheck for all neighbors 00150 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors(); 00151 00152 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++) 00153 { 00154 const dtn::core::Node &n = (*iter); 00155 00156 // transfer the next bundle to this destination 00157 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00158 } 00159 } catch (const std::bad_cast&) { }; 00160 00161 } catch (const std::exception &ex) { 00162 IBRCOMMON_LOGGER_DEBUG(20) << "neighbor routing failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00163 return; 00164 } 00165 00166 yield(); 00167 } 00168 } 00169 00170 void NeighborRoutingExtension::notify(const dtn::core::Event *evt) 00171 { 00172 try { 00173 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt); 00174 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) ); 00175 return; 00176 } catch (const std::bad_cast&) { }; 00177 00178 try { 00179 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt); 00180 const dtn::data::MetaBundle &meta = completed.getBundle(); 00181 const dtn::data::EID &peer = completed.getPeer(); 00182 00183 if ((meta.destination.getNode() == peer.getNode()) 00184 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON)) 00185 { 00186 try { 00187 dtn::core::BundleStorage &storage = (**this).getStorage(); 00188 00189 // bundle has been delivered to its destination 00190 // delete it from our storage 00191 storage.remove(meta); 00192 00193 IBRCOMMON_LOGGER(notice) << "singleton bundle delivered and removed: " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00194 00195 // gen a report 00196 dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE); 00197 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }; 00198 00199 // transfer the next bundle to this destination 00200 _taskqueue.push( new SearchNextBundleTask( peer ) ); 00201 } 00202 return; 00203 } catch (const std::bad_cast&) { }; 00204 00205 try { 00206 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt); 00207 const dtn::data::EID &peer = aborted.getPeer(); 00208 const dtn::data::BundleID &id = aborted.getBundleID(); 00209 00210 switch (aborted.reason) 00211 { 00212 case dtn::net::TransferAbortedEvent::REASON_UNDEFINED: 00213 break; 00214 00215 case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED: 00216 break; 00217 00218 case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED: 00219 break; 00220 00221 case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN: 00222 return; 00223 00224 case dtn::net::TransferAbortedEvent::REASON_REFUSED: 00225 { 00226 try { 00227 const dtn::data::MetaBundle meta = (**this).getStorage().get(id); 00228 00229 // if the bundle has been sent by this module delete it 00230 if ((meta.destination.getNode() == peer.getNode()) 00231 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON)) 00232 { 00233 // bundle is not deliverable 00234 (**this).getStorage().remove(id); 00235 } 00236 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }; 00237 } 00238 break; 00239 } 00240 00241 // transfer the next bundle to this destination 00242 _taskqueue.push( new SearchNextBundleTask( peer ) ); 00243 00244 return; 00245 } catch (const std::bad_cast&) { }; 00246 00247 // If a new neighbor comes available, send him a request for the summary vector 00248 // If a neighbor went away we can free the stored summary vector 00249 try { 00250 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00251 const dtn::core::Node &n = nodeevent.getNode(); 00252 00253 if (nodeevent.getAction() == NODE_AVAILABLE) 00254 { 00255 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00256 } 00257 00258 return; 00259 } catch (const std::bad_cast&) { }; 00260 00261 try { 00262 const dtn::net::ConnectionEvent &ce = dynamic_cast<const dtn::net::ConnectionEvent&>(*evt); 00263 00264 if (ce.state == dtn::net::ConnectionEvent::CONNECTION_UP) 00265 { 00266 // send all (multi-hop) bundles in the storage to the neighbor 00267 _taskqueue.push( new SearchNextBundleTask(ce.peer) ); 00268 } 00269 return; 00270 } catch (const std::bad_cast&) { }; 00271 } 00272 00273 /****************************************/ 00274 00275 NeighborRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e) 00276 : eid(e) 00277 { } 00278 00279 NeighborRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask() 00280 { } 00281 00282 std::string NeighborRoutingExtension::SearchNextBundleTask::toString() 00283 { 00284 return "SearchNextBundleTask: " + eid.getString(); 00285 } 00286 00287 /****************************************/ 00288 00289 NeighborRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o) 00290 : bundle(meta), origin(o) 00291 { } 00292 00293 NeighborRoutingExtension::ProcessBundleTask::~ProcessBundleTask() 00294 { } 00295 00296 std::string NeighborRoutingExtension::ProcessBundleTask::toString() 00297 { 00298 return "ProcessBundleTask: " + bundle.toString(); 00299 } 00300 } 00301 }