00001 /* 00002 * FloodRoutingExtension.cpp 00003 * 00004 * Created on: 18.02.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "routing/FloodRoutingExtension.h" 00009 #include "routing/QueueBundleEvent.h" 00010 #include "net/TransferCompletedEvent.h" 00011 #include "net/TransferAbortedEvent.h" 00012 #include "core/BundleExpiredEvent.h" 00013 #include "core/NodeEvent.h" 00014 #include "core/TimeEvent.h" 00015 #include "core/Node.h" 00016 #include "net/ConnectionManager.h" 00017 #include "Configuration.h" 00018 #include "core/BundleCore.h" 00019 #include "core/SimpleBundleStorage.h" 00020 00021 #include <ibrdtn/data/MetaBundle.h> 00022 #include <ibrcommon/thread/MutexLock.h> 00023 #include <ibrcommon/Logger.h> 00024 #include <ibrcommon/AutoDelete.h> 00025 00026 #include <functional> 00027 #include <list> 00028 #include <algorithm> 00029 00030 #include <iomanip> 00031 #include <ios> 00032 #include <iostream> 00033 00034 #include <stdlib.h> 00035 #include <typeinfo> 00036 00037 namespace dtn 00038 { 00039 namespace routing 00040 { 00041 struct FindNode: public std::binary_function< dtn::core::Node, dtn::core::Node, bool > { 00042 bool operator () ( const dtn::core::Node &n1, const dtn::core::Node &n2 ) const { 00043 return n1 == n2; 00044 } 00045 }; 00046 00047 FloodRoutingExtension::FloodRoutingExtension() 00048 { 00049 // get configuration 00050 dtn::daemon::Configuration &conf = dtn::daemon::Configuration::getInstance(); 00051 00052 // write something to the syslog 00053 IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module for node " << conf.getNodename() << IBRCOMMON_LOGGER_ENDL; 00054 } 00055 00056 FloodRoutingExtension::~FloodRoutingExtension() 00057 { 00058 stop(); 00059 join(); 00060 } 00061 00062 void FloodRoutingExtension::stopExtension() 00063 { 00064 _taskqueue.abort(); 00065 } 00066 00067 void FloodRoutingExtension::notify(const dtn::core::Event *evt) 00068 { 00069 try { 00070 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt); 00071 _taskqueue.push( new ProcessBundleTask(queued.bundle) ); 00072 } catch (std::bad_cast ex) { }; 00073 00074 try { 00075 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt); 00076 00077 if (time.getAction() == dtn::core::TIME_SECOND_TICK) 00078 { 00079 // check all lists for expired entries 00080 _taskqueue.push( new ExpireTask(time.getTimestamp()) ); 00081 } 00082 } catch (std::bad_cast ex) { }; 00083 00084 try { 00085 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt); 00086 00087 dtn::core::Node n = nodeevent.getNode(); 00088 dtn::data::EID eid(n.getURI()); 00089 00090 switch (nodeevent.getAction()) 00091 { 00092 case NODE_AVAILABLE: 00093 { 00094 ibrcommon::MutexLock l(_list_mutex); 00095 _neighbors.setAvailable(eid); 00096 00097 _taskqueue.push( new SearchNextBundleTask( eid ) ); 00098 } 00099 break; 00100 00101 case NODE_UNAVAILABLE: 00102 { 00103 ibrcommon::MutexLock l(_list_mutex); 00104 _neighbors.setUnavailable(eid); 00105 } 00106 break; 00107 00108 default: 00109 break; 00110 } 00111 } catch (std::bad_cast ex) { }; 00112 00113 // TransferAbortedEvent: send next bundle 00114 try { 00115 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt); 00116 dtn::data::EID eid = aborted.getPeer(); 00117 dtn::data::BundleID id = aborted.getBundleID(); 00118 00119 switch (aborted.reason) 00120 { 00121 case dtn::net::TransferAbortedEvent::REASON_UNDEFINED: 00122 { 00123 break; 00124 } 00125 00126 case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED: 00127 { 00128 // transfer the next bundle to this destination 00129 _taskqueue.push( new SearchNextBundleTask( eid ) ); 00130 break; 00131 } 00132 00133 case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN: 00134 { 00135 break; 00136 } 00137 00138 // How to exclude several bundles? Combine two bloomfilters? Use a blocklist. Delay transfers to the node? 00139 case dtn::net::TransferAbortedEvent::REASON_REFUSED: 00140 { 00141 // add the transferred bundle to the bloomfilter of the receiver 00142 ibrcommon::MutexLock l(_list_mutex); 00143 ibrcommon::BloomFilter &bf = _neighbors.get(eid)._filter; 00144 bf.insert(id.toString()); 00145 00146 if (IBRCOMMON_LOGGER_LEVEL >= 40) 00147 { 00148 IBRCOMMON_LOGGER_DEBUG(40) << "bloomfilter false-positive propability is " << bf.getAllocation() << IBRCOMMON_LOGGER_ENDL; 00149 } 00150 00151 // transfer the next bundle to this destination 00152 _taskqueue.push( new SearchNextBundleTask( eid ) ); 00153 break; 00154 } 00155 00156 case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED: 00157 { 00158 // transfer the next bundle to this destination 00159 _taskqueue.push( new SearchNextBundleTask( eid ) ); 00160 break; 00161 } 00162 } 00163 } catch (std::bad_cast ex) { }; 00164 00165 try { 00166 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt); 00167 00168 dtn::data::EID eid = completed.getPeer(); 00169 dtn::data::MetaBundle meta = completed.getBundle(); 00170 00171 // delete the bundle in the storage if 00172 if ( EID(eid.getNodeEID()) == EID(meta.destination.getNodeEID()) ) 00173 { 00174 try { 00175 // bundle has been delivered to its destination 00176 // TODO: generate a "delete" message for routing algorithm 00177 00178 // delete it from our storage 00179 getRouter()->getStorage().remove(meta); 00180 00181 IBRCOMMON_LOGGER_DEBUG(15) << "bundle delivered: " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00182 00183 // add it to the purge vector 00184 _purge_vector.add(meta); 00185 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { 00186 00187 } 00188 } 00189 else 00190 { 00191 // add the transferred bundle to the bloomfilter of the receiver 00192 ibrcommon::MutexLock l(_list_mutex); 00193 ibrcommon::BloomFilter &bf = _neighbors.get(eid)._filter; 00194 bf.insert(meta.toString()); 00195 00196 if (IBRCOMMON_LOGGER_LEVEL >= 40) 00197 { 00198 IBRCOMMON_LOGGER_DEBUG(40) << "bloomfilter false-positive propability is " << bf.getAllocation() << IBRCOMMON_LOGGER_ENDL; 00199 } 00200 } 00201 00202 // transfer the next bundle to this destination 00203 _taskqueue.push( new SearchNextBundleTask( eid ) ); 00204 00205 } catch (std::bad_cast ex) { }; 00206 } 00207 00208 bool FloodRoutingExtension::__cancellation() 00209 { 00210 _taskqueue.abort(); 00211 return true; 00212 } 00213 00214 void FloodRoutingExtension::run() 00215 { 00216 while (true) 00217 { 00218 try { 00219 Task *t = _taskqueue.getnpop(true); 00220 ibrcommon::AutoDelete<Task> killer(t); 00221 00222 IBRCOMMON_LOGGER_DEBUG(50) << "processing flooding task " << t->toString() << IBRCOMMON_LOGGER_ENDL; 00223 00224 try { 00225 try { 00226 ExpireTask &task = dynamic_cast<ExpireTask&>(*t); 00227 00228 ibrcommon::MutexLock l(_list_mutex); 00229 _purge_vector.expire(task.timestamp); 00230 } catch (std::bad_cast) { }; 00231 00232 try { 00233 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t); 00234 00235 IBRCOMMON_LOGGER_DEBUG(40) << "search one bundle not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL; 00236 00237 ibrcommon::MutexLock l(_list_mutex); 00238 ibrcommon::BloomFilter &bf = _neighbors.get(task.eid)._filter; 00239 const dtn::data::BundleID b = getRouter()->getStorage().getByFilter(bf); 00240 getRouter()->transferTo(task.eid, b); 00241 00242 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { 00243 } catch (std::bad_cast) { }; 00244 00245 try { 00246 dynamic_cast<ProcessBundleTask&>(*t); 00247 ibrcommon::MutexLock l(_list_mutex); 00248 00249 // trigger transmission for all neighbors 00250 std::set<dtn::data::EID> list; 00251 { 00252 list = _neighbors.getAvailable(); 00253 } 00254 00255 for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00256 { 00257 // transfer the next bundle to this destination 00258 _taskqueue.push( new SearchNextBundleTask( *iter ) ); 00259 } 00260 } catch (dtn::data::Bundle::NoSuchBlockFoundException) { 00261 // if the bundle does not contains the expected block 00262 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { 00263 // if the bundle is not in the storage we have nothing to do 00264 } catch (std::bad_cast) { }; 00265 } catch (ibrcommon::Exception ex) { 00266 IBRCOMMON_LOGGER(error) << "Exception occurred in FloodRoutingExtension: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00267 } 00268 } catch (std::exception) { 00269 return; 00270 } 00271 00272 yield(); 00273 } 00274 } 00275 00276 /****************************************/ 00277 00278 FloodRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e) 00279 : eid(e) 00280 { } 00281 00282 FloodRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask() 00283 { } 00284 00285 std::string FloodRoutingExtension::SearchNextBundleTask::toString() 00286 { 00287 return "SearchNextBundleTask: " + eid.getString(); 00288 } 00289 00290 /****************************************/ 00291 00292 FloodRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta) 00293 : bundle(meta) 00294 { } 00295 00296 FloodRoutingExtension::ProcessBundleTask::~ProcessBundleTask() 00297 { } 00298 00299 std::string FloodRoutingExtension::ProcessBundleTask::toString() 00300 { 00301 return "ProcessBundleTask: " + bundle.toString(); 00302 } 00303 00304 /****************************************/ 00305 00306 FloodRoutingExtension::ExpireTask::ExpireTask(const size_t t) 00307 : timestamp(t) 00308 { } 00309 00310 FloodRoutingExtension::ExpireTask::~ExpireTask() 00311 { } 00312 00313 std::string FloodRoutingExtension::ExpireTask::toString() 00314 { 00315 return "ExpireTask"; 00316 } 00317 } 00318 }
1.7.1