|
IBR-DTNSuite 0.6
|
00001 /* 00002 * NeighborRoutingExtension.cpp 00003 * 00004 * Created on: 16.02.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "routing/NeighborRoutingExtension.h" 00010 #include "routing/QueueBundleEvent.h" 00011 #include "core/TimeEvent.h" 00012 #include "net/TransferCompletedEvent.h" 00013 #include "net/TransferAbortedEvent.h" 00014 #include "core/BundleExpiredEvent.h" 00015 #include "core/NodeEvent.h" 00016 #include "core/Node.h" 00017 #include "net/ConnectionManager.h" 00018 #include "ibrcommon/thread/MutexLock.h" 00019 #include "core/SimpleBundleStorage.h" 00020 #include "core/BundleEvent.h" 00021 #include <ibrcommon/Logger.h> 00022 #include <ibrcommon/AutoDelete.h> 00023 00024 #ifdef HAVE_SQLITE 00025 #include "core/SQLiteBundleStorage.h" 00026 #endif 00027 00028 #include <functional> 00029 #include <list> 00030 #include <algorithm> 00031 #include <typeinfo> 00032 00033 namespace dtn 00034 { 00035 namespace routing 00036 { 00037 NeighborRoutingExtension::NeighborRoutingExtension() 00038 { 00039 } 00040 00041 NeighborRoutingExtension::~NeighborRoutingExtension() 00042 { 00043 stop(); 00044 join(); 00045 } 00046 00047 void NeighborRoutingExtension::stopExtension() 00048 { 00049 _taskqueue.abort(); 00050 } 00051 00052 bool NeighborRoutingExtension::__cancellation() 00053 { 00054 _taskqueue.abort(); 00055 return true; 00056 } 00057 00058 void NeighborRoutingExtension::run() 00059 { 00060 #ifdef HAVE_SQLITE 00061 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery 00062 #else 00063 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback 00064 #endif 00065 { 00066 public: 00067 BundleFilter(const dtn::data::EID &destination) 00068 : _destination(destination) 00069 {}; 00070 00071 virtual ~BundleFilter() {}; 00072 00073 virtual size_t limit() const { return 10; }; 00074 00075 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00076 { 00077 if (_destination.getNode() != meta.destination.getNode()) 00078 { 00079 return false; 00080 } 00081 00082 return true; 00083 }; 00084 00085 #ifdef HAVE_SQLITE 00086 const std::string getWhere() const 00087 { 00088 return "destination LIKE ?"; 00089 }; 00090 00091 size_t bind(sqlite3_stmt *st, size_t offset) const 00092 { 00093 const std::string d = _destination.getNode().getString() + "/%"; 00094 sqlite3_bind_text(st, offset, d.c_str(), d.size(), SQLITE_TRANSIENT); 00095 return offset + 1; 00096 } 00097 #endif 00098 00099 private: 00100 const dtn::data::EID &_destination; 00101 }; 00102 00103 dtn::core::BundleStorage &storage = (**this).getStorage(); 00104 00105 while (true) 00106 { 00107 try { 00108 Task *t = _taskqueue.getnpop(true); 00109 ibrcommon::AutoDelete<Task> killer(t); 00110 00111 IBRCOMMON_LOGGER_DEBUG(5) << "processing neighbor routing task " << t->toString() << IBRCOMMON_LOGGER_ENDL; 00112 00118 try { 00119 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t); 00120 00121 try { 00122 // create a new bundle filter 00123 BundleFilter filter(task.eid); 00124 00125 // query an unknown bundle from the storage, the list contains max. 10 items. 00126 const std::list<dtn::data::MetaBundle> list = storage.get(filter); 00127 00128 IBRCOMMON_LOGGER_DEBUG(5) << "got " << list.size() << " items to transfer to " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL; 00129 00130 // send the bundles as long as we have resources 00131 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00132 { 00133 try { 00134 // transfer the bundle to the neighbor 00135 transferTo(task.eid, *iter); 00136 } catch (const NeighborDatabase::AlreadyInTransitException&) { }; 00137 } 00138 } catch (const NeighborDatabase::NoMoreTransfersAvailable&) { }; 00139 } catch (const NeighborDatabase::NeighborNotAvailableException&) { 00140 } catch (const std::bad_cast&) { }; 00141 00145 try { 00146 dynamic_cast<ProcessBundleTask&>(*t); 00147 00148 // new bundles trigger a recheck for all neighbors 00149 const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getNeighbors(); 00150 00151 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); iter++) 00152 { 00153 const dtn::core::Node &n = (*iter); 00154 00155 // transfer the next bundle to this destination 00156 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00157 } 00158 } catch (const std::bad_cast&) { }; 00159 00160 } catch (const std::exception &ex) { 00161 IBRCOMMON_LOGGER_DEBUG(20) << "neighbor routing failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00162 return; 00163 } 00164 00165 yield(); 00166 } 00167 } 00168 00169 void NeighborRoutingExtension::notify(const dtn::core::Event *evt) 00170 { 00171 try { 00172 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt); 00173 _taskqueue.push( new ProcessBundleTask(queued.bundle, queued.origin) ); 00174 return; 00175 } catch (const std::bad_cast&) { }; 00176 00177 try { 00178 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt); 00179 const dtn::data::MetaBundle &meta = completed.getBundle(); 00180 const dtn::data::EID &peer = completed.getPeer(); 00181 00182 if ((meta.destination.getNode() == peer.getNode()) 00183 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON)) 00184 { 00185 try { 00186 dtn::core::BundleStorage &storage = (**this).getStorage(); 00187 00188 // bundle has been delivered to its destination 00189 // delete it from our storage 00190 storage.remove(meta); 00191 00192 IBRCOMMON_LOGGER(notice) << "singleton bundle delivered and removed: " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00193 00194 // gen a report 00195 dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE); 00196 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }; 00197 00198 // transfer the next bundle to this destination 00199 _taskqueue.push( new SearchNextBundleTask( peer ) ); 00200 } 00201 return; 00202 } catch (const std::bad_cast&) { }; 00203 00204 try { 00205 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt); 00206 const dtn::data::EID &peer = aborted.getPeer(); 00207 const dtn::data::BundleID &id = aborted.getBundleID(); 00208 00209 switch (aborted.reason) 00210 { 00211 case dtn::net::TransferAbortedEvent::REASON_UNDEFINED: 00212 break; 00213 00214 case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED: 00215 break; 00216 00217 case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED: 00218 break; 00219 00220 case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN: 00221 return; 00222 00223 case dtn::net::TransferAbortedEvent::REASON_REFUSED: 00224 { 00225 try { 00226 const dtn::data::MetaBundle meta = (**this).getStorage().get(id); 00227 00228 // if the bundle has been sent by this module delete it 00229 if ((meta.destination.getNode() == peer.getNode()) 00230 && (meta.procflags & dtn::data::Bundle::DESTINATION_IS_SINGLETON)) 00231 { 00232 // bundle is not deliverable 00233 (**this).getStorage().remove(id); 00234 } 00235 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }; 00236 } 00237 break; 00238 } 00239 00240 // transfer the next bundle to this destination 00241 _taskqueue.push( new SearchNextBundleTask( peer ) ); 00242 00243 return; 00244 } catch (const std::bad_cast&) { }; 00245 00246 // If a new neighbor comes available, send him a request for the summary vector 00247 // If a neighbor went away we can free the stored summary vector 00248 try { 00249 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00250 const dtn::core::Node &n = nodeevent.getNode(); 00251 00252 if (nodeevent.getAction() == NODE_AVAILABLE) 00253 { 00254 _taskqueue.push( new SearchNextBundleTask( n.getEID() ) ); 00255 } 00256 00257 return; 00258 } catch (const std::bad_cast&) { }; 00259 } 00260 00261 /****************************************/ 00262 00263 NeighborRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e) 00264 : eid(e) 00265 { } 00266 00267 NeighborRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask() 00268 { } 00269 00270 std::string NeighborRoutingExtension::SearchNextBundleTask::toString() 00271 { 00272 return "SearchNextBundleTask: " + eid.getString(); 00273 } 00274 00275 /****************************************/ 00276 00277 NeighborRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta, const dtn::data::EID &o) 00278 : bundle(meta), origin(o) 00279 { } 00280 00281 NeighborRoutingExtension::ProcessBundleTask::~ProcessBundleTask() 00282 { } 00283 00284 std::string NeighborRoutingExtension::ProcessBundleTask::toString() 00285 { 00286 return "ProcessBundleTask: " + bundle.toString(); 00287 } 00288 } 00289 }