00001
00002
00003
00004
00005
00006
00007
00008 #include "routing/EpidemicRoutingExtension.h"
00009 #include "routing/QueueBundleEvent.h"
00010 #include "net/TransferCompletedEvent.h"
00011 #include "core/BundleExpiredEvent.h"
00012 #include "core/NodeEvent.h"
00013 #include "core/TimeEvent.h"
00014 #include "core/Node.h"
00015 #include "net/ConnectionManager.h"
00016 #include "Configuration.h"
00017 #include "core/BundleCore.h"
00018 #include "core/SimpleBundleStorage.h"
00019
00020 #include <ibrdtn/data/MetaBundle.h>
00021 #include <ibrcommon/thread/MutexLock.h>
00022 #include <ibrcommon/Logger.h>
00023
00024 #include <functional>
00025 #include <list>
00026 #include <algorithm>
00027
00028 #include <iomanip>
00029 #include <ios>
00030 #include <iostream>
00031
00032 #include <stdlib.h>
00033 #include <typeinfo>
00034
00035 namespace dtn
00036 {
00037 namespace routing
00038 {
00039 struct FindNode: public std::binary_function< dtn::core::Node, dtn::core::Node, bool > {
00040 bool operator () ( const dtn::core::Node &n1, const dtn::core::Node &n2 ) const {
00041 return n1 == n2;
00042 }
00043 };
00044
00045 EpidemicRoutingExtension::EpidemicRoutingExtension()
00046 {
00047
00048 dtn::daemon::Configuration &conf = dtn::daemon::Configuration::getInstance();
00049
00050
00051 IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module for node " << conf.getNodename() << IBRCOMMON_LOGGER_ENDL;
00052 }
00053
00054 EpidemicRoutingExtension::~EpidemicRoutingExtension()
00055 {
00056 stopExtension();
00057 join();
00058 }
00059
00060 void EpidemicRoutingExtension::stopExtension()
00061 {
00062 _taskqueue.unblock();
00063 }
00064
00065 void EpidemicRoutingExtension::update(std::string &name, std::string &data)
00066 {
00067 name = "epidemic";
00068 data = "version=1";
00069 }
00070
00071 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt)
00072 {
00073 try {
00074 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00075 _taskqueue.push( new ProcessBundleTask(queued.bundle) );
00076 } catch (std::bad_cast ex) { };
00077
00078 try {
00079 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00080
00081 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00082 {
00083
00084 _taskqueue.push( new ExpireTask(time.getTimestamp()) );
00085 }
00086 } catch (std::bad_cast ex) { };
00087
00088 try {
00089 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00090
00091 dtn::core::Node n = nodeevent.getNode();
00092 dtn::data::EID eid(n.getURI());
00093
00094 switch (nodeevent.getAction())
00095 {
00096 case NODE_AVAILABLE:
00097 {
00098 ibrcommon::MutexLock l(_list_mutex);
00099 _neighbors.setAvailable(eid);
00100
00101 _taskqueue.push( new UpdateSummaryVectorTask( eid ) );
00102 }
00103 break;
00104
00105 case NODE_UNAVAILABLE:
00106 {
00107 ibrcommon::MutexLock l(_list_mutex);
00108 _neighbors.setUnavailable(eid);
00109 }
00110 break;
00111
00112 default:
00113 break;
00114 }
00115 } catch (std::bad_cast ex) { };
00116
00117
00118
00119
00120
00121 try {
00122 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00123
00124 dtn::data::EID eid = completed.getPeer();
00125 dtn::data::MetaBundle meta = completed.getBundle();
00126
00127 try {
00128
00129 if ( EID(eid.getNodeEID()) == EID(meta.destination) )
00130 {
00131
00132
00133
00134
00135 getRouter()->getStorage().remove(meta);
00136 }
00137 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) {
00138
00139 }
00140
00141
00142 {
00143 ibrcommon::MutexLock l(_list_mutex);
00144 ibrcommon::BloomFilter &bf = _neighbors.get(eid)._filter;
00145 bf.insert(meta.toString());
00146 }
00147
00148
00149 _taskqueue.push( new SearchNextBundleTask( eid ) );
00150
00151 } catch (std::bad_cast ex) { };
00152 }
00153
00154 void EpidemicRoutingExtension::transferEpidemicInformation(const dtn::data::EID &eid)
00155 {
00160 dtn::data::Bundle routingbundle;
00161 routingbundle._lifetime = 10;
00162 routingbundle._source = dtn::core::BundleCore::local;
00163 routingbundle._destination = EID("dtn:epidemic-routing");
00164
00165
00166 {
00167
00168 ibrcommon::MutexLock l(_list_mutex);
00169 EpidemicExtensionBlock &eblock = routingbundle.push_back<EpidemicExtensionBlock>();
00170 const SummaryVector vec = getRouter()->getSummaryVector();
00171 eblock.setSummaryVector(vec);
00172 }
00173
00174 getRouter()->transferTo(eid, routingbundle);
00175 }
00176
00177 void EpidemicRoutingExtension::run()
00178 {
00179 bool sendVectorUpdate = false;
00180
00181 while (true)
00182 {
00183 try {
00184 Task *t = _taskqueue.blockingpop();
00185
00186 try {
00187 ExpireTask &task = dynamic_cast<ExpireTask&>(*t);
00188
00189
00190 if (sendVectorUpdate)
00191 {
00192 sendVectorUpdate = false;
00193 _taskqueue.push( new BroadcastSummaryVectorTask() );
00194 }
00195
00196 ibrcommon::MutexLock l(_list_mutex);
00197 _seenlist.expire(task.timestamp);
00198 } catch (std::bad_cast) { };
00199
00200 try {
00201 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00202
00203 ibrcommon::MutexLock l(_list_mutex);
00204 ibrcommon::BloomFilter &bf = _neighbors.get(task.eid)._filter;
00205 dtn::data::Bundle b = getRouter()->getStorage().get(bf);
00206 getRouter()->transferTo(task.eid, b);
00207
00208 } catch (dtn::core::BundleStorage::NoBundleFoundException) {
00209 } catch (std::bad_cast) { };
00210
00211 try {
00212 dynamic_cast<BroadcastSummaryVectorTask&>(*t);
00213
00214 std::set<dtn::data::EID> list;
00215 {
00216 ibrcommon::MutexLock l(_list_mutex);
00217 list = _neighbors.getAvailable();
00218 }
00219
00220 for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00221 {
00222 transferEpidemicInformation(*iter);
00223 }
00224
00225 } catch (std::bad_cast) { };
00226
00227 try {
00228 UpdateSummaryVectorTask &task = dynamic_cast<UpdateSummaryVectorTask&>(*t);
00229 transferEpidemicInformation(task.eid);
00230 } catch (std::bad_cast) { };
00231
00232 try {
00233 ProcessBundleTask &task = dynamic_cast<ProcessBundleTask&>(*t);
00234 ibrcommon::MutexLock l(_list_mutex);
00235
00236
00237 if (task.bundle.destination == EID("dtn:epidemic-routing"))
00238 {
00239
00240 dtn::data::Bundle bundle = getRouter()->getStorage().get(task.bundle);
00241
00242
00243 getRouter()->getStorage().remove(bundle);
00244
00245 try {
00246
00247 const EpidemicExtensionBlock &ext = bundle.getBlock<EpidemicExtensionBlock>();
00248
00249
00250 const ibrcommon::BloomFilter &filter = ext.getSummaryVector().getBloomFilter();
00251
00252
00253 _neighbors.updateBundles(bundle._source, filter);
00254 } catch (dtn::data::Bundle::NoSuchBlockFoundException) {
00255
00256 }
00257 }
00258 else
00259 {
00260 sendVectorUpdate = true;
00261 }
00262 } catch (dtn::core::BundleStorage::NoBundleFoundException) {
00263
00264 } catch (std::bad_cast) { };
00265
00266 delete t;
00267 } catch (ibrcommon::Exception ex) {
00268 return;
00269 }
00270 }
00271 }
00272
00273 bool EpidemicRoutingExtension::wasSeenBefore(const dtn::data::BundleID &id) const
00274 {
00275 return ( std::find( _seenlist.begin(), _seenlist.end(), id ) != _seenlist.end());
00276 }
00277
00278 dtn::data::Block* EpidemicRoutingExtension::EpidemicExtensionBlock::Factory::create()
00279 {
00280 return new EpidemicRoutingExtension::EpidemicExtensionBlock();
00281 }
00282
00283 EpidemicRoutingExtension::EpidemicExtensionBlock::EpidemicExtensionBlock()
00284 : dtn::data::Block(EpidemicExtensionBlock::BLOCK_TYPE), _data("forwarded through epidemic routing")
00285 {
00286 }
00287
00288 EpidemicRoutingExtension::EpidemicExtensionBlock::~EpidemicExtensionBlock()
00289 {
00290 }
00291
00292 void EpidemicRoutingExtension::EpidemicExtensionBlock::setSummaryVector(const SummaryVector &vector)
00293 {
00294 _vector = vector;
00295 }
00296
00297 const SummaryVector& EpidemicRoutingExtension::EpidemicExtensionBlock::getSummaryVector() const
00298 {
00299 return _vector;
00300 }
00301
00302 void EpidemicRoutingExtension::EpidemicExtensionBlock::set(dtn::data::SDNV value)
00303 {
00304 _counter = value;
00305 }
00306
00307 dtn::data::SDNV EpidemicRoutingExtension::EpidemicExtensionBlock::get() const
00308 {
00309 return _counter;
00310 }
00311
00312 size_t EpidemicRoutingExtension::EpidemicExtensionBlock::getLength() const
00313 {
00314 return _counter.getLength() + _data.getLength() + _vector.getLength();
00315 }
00316
00317 std::istream& EpidemicRoutingExtension::EpidemicExtensionBlock::deserialize(std::istream &stream)
00318 {
00319 stream >> _counter;
00320 stream >> _data;
00321 stream >> _vector;
00322
00323
00324 dtn::data::Block::set(dtn::data::Block::FORWARDED_WITHOUT_PROCESSED, false);
00325
00326 return stream;
00327 }
00328
00329 std::ostream &EpidemicRoutingExtension::EpidemicExtensionBlock::serialize(std::ostream &stream) const
00330 {
00331 stream << _counter;
00332 stream << _data;
00333 stream << _vector;
00334
00335 return stream;
00336 }
00337
00338
00339
00340 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00341 : eid(e)
00342 { }
00343
00344 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00345 { }
00346
00347 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
00348 {
00349 return "SearchNextBundleTask: " + eid.getString();
00350 }
00351
00352
00353
00354 EpidemicRoutingExtension::BroadcastSummaryVectorTask::BroadcastSummaryVectorTask()
00355 { }
00356
00357 EpidemicRoutingExtension::BroadcastSummaryVectorTask::~BroadcastSummaryVectorTask()
00358 { }
00359
00360 std::string EpidemicRoutingExtension::BroadcastSummaryVectorTask::toString()
00361 {
00362 return "BroadcastSummaryVectorTask";
00363 }
00364
00365
00366
00367 EpidemicRoutingExtension::UpdateSummaryVectorTask::UpdateSummaryVectorTask(const dtn::data::EID &e)
00368 : eid(e)
00369 { }
00370
00371 EpidemicRoutingExtension::UpdateSummaryVectorTask::~UpdateSummaryVectorTask()
00372 { }
00373
00374 std::string EpidemicRoutingExtension::UpdateSummaryVectorTask::toString()
00375 {
00376 return "UpdateSummaryVectorTask: " + eid.getString();
00377 }
00378
00379
00380
00381 EpidemicRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta)
00382 : bundle(meta)
00383 { }
00384
00385 EpidemicRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00386 { }
00387
00388 std::string EpidemicRoutingExtension::ProcessBundleTask::toString()
00389 {
00390 return "ProcessBundleTask: " + bundle.toString();
00391 }
00392
00393
00394
00395 EpidemicRoutingExtension::ExpireTask::ExpireTask(const size_t t)
00396 : timestamp(t)
00397 { }
00398
00399 EpidemicRoutingExtension::ExpireTask::~ExpireTask()
00400 { }
00401
00402 std::string EpidemicRoutingExtension::ExpireTask::toString()
00403 {
00404 return "ExpireTask";
00405 }
00406 }
00407 }