00001
00002
00003
00004
00005
00006
00007
00008 #include "routing/EpidemicRoutingExtension.h"
00009 #include "routing/QueueBundleEvent.h"
00010 #include "net/TransferCompletedEvent.h"
00011 #include "core/BundleExpiredEvent.h"
00012 #include "core/NodeEvent.h"
00013 #include "core/TimeEvent.h"
00014 #include "core/Node.h"
00015 #include "net/ConnectionManager.h"
00016 #include "Configuration.h"
00017 #include "core/BundleCore.h"
00018 #include "core/SimpleBundleStorage.h"
00019
00020 #include <ibrdtn/data/MetaBundle.h>
00021 #include <ibrcommon/thread/MutexLock.h>
00022 #include <ibrcommon/Logger.h>
00023
00024 #include <functional>
00025 #include <list>
00026 #include <algorithm>
00027
00028 #include <iomanip>
00029 #include <ios>
00030 #include <iostream>
00031
00032 #include <stdlib.h>
00033 #include <typeinfo>
00034
00035 namespace dtn
00036 {
00037 namespace routing
00038 {
00039 struct FindNode: public std::binary_function< dtn::core::Node, dtn::core::Node, bool > {
00040 bool operator () ( const dtn::core::Node &n1, const dtn::core::Node &n2 ) const {
00041 return n1 == n2;
00042 }
00043 };
00044
00045 EpidemicRoutingExtension::EpidemicRoutingExtension()
00046 {
00047
00048 dtn::daemon::Configuration &conf = dtn::daemon::Configuration::getInstance();
00049
00050
00051 IBRCOMMON_LOGGER(info) << "Initializing epidemic routing module for node " << conf.getNodename() << IBRCOMMON_LOGGER_ENDL;
00052 }
00053
00054 EpidemicRoutingExtension::~EpidemicRoutingExtension()
00055 {
00056 stopExtension();
00057 join();
00058 }
00059
00060 void EpidemicRoutingExtension::stopExtension()
00061 {
00062 _taskqueue.unblock();
00063 }
00064
00065 void EpidemicRoutingExtension::update(std::string &name, std::string &data)
00066 {
00067 name = "epidemic";
00068 data = "version=1";
00069 }
00070
00071 void EpidemicRoutingExtension::notify(const dtn::core::Event *evt)
00072 {
00073 try {
00074 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00075 _taskqueue.push( new ProcessBundleTask(queued.bundle) );
00076 } catch (std::bad_cast ex) { };
00077
00078 try {
00079 const dtn::core::TimeEvent &time = dynamic_cast<const dtn::core::TimeEvent&>(*evt);
00080
00081 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00082 {
00083
00084 _taskqueue.push( new ExpireTask(time.getTimestamp()) );
00085 }
00086 } catch (std::bad_cast ex) { };
00087
00088 try {
00089 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00090
00091 dtn::core::Node n = nodeevent.getNode();
00092 dtn::data::EID eid(n.getURI());
00093
00094 switch (nodeevent.getAction())
00095 {
00096 case NODE_AVAILABLE:
00097 {
00098 ibrcommon::MutexLock l(_list_mutex);
00099 _neighbors.setAvailable(eid);
00100
00101 _taskqueue.push( new UpdateSummaryVectorTask( eid ) );
00102 }
00103 break;
00104
00105 case NODE_UNAVAILABLE:
00106 {
00107 ibrcommon::MutexLock l(_list_mutex);
00108 _neighbors.setUnavailable(eid);
00109 }
00110 break;
00111
00112 default:
00113 break;
00114 }
00115 } catch (std::bad_cast ex) { };
00116
00117 try {
00118 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00119
00120 dtn::data::EID eid = completed.getPeer();
00121 dtn::data::MetaBundle meta = completed.getBundle();
00122
00123 try {
00124
00125 if ( EID(eid.getNodeEID()) == EID(meta.destination) )
00126 {
00127
00128
00129
00130
00131 getRouter()->getStorage().remove(meta);
00132 }
00133 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) {
00134
00135 }
00136
00137
00138 {
00139 ibrcommon::MutexLock l(_list_mutex);
00140 ibrcommon::BloomFilter &bf = _neighbors.get(eid)._filter;
00141 bf.insert(meta.toString());
00142 }
00143
00144
00145 _taskqueue.push( new SearchNextBundleTask( eid ) );
00146
00147 } catch (std::bad_cast ex) { };
00148 }
00149
00150 void EpidemicRoutingExtension::transferEpidemicInformation(const dtn::data::EID &eid)
00151 {
00156 dtn::data::Bundle routingbundle;
00157 routingbundle._lifetime = 10;
00158 routingbundle._source = dtn::core::BundleCore::local;
00159 routingbundle._destination = EID("dtn:epidemic-routing");
00160
00161
00162 {
00163
00164 ibrcommon::MutexLock l(_list_mutex);
00165 EpidemicExtensionBlock &eblock = routingbundle.push_back<EpidemicExtensionBlock>();
00166 const SummaryVector vec = getRouter()->getSummaryVector();
00167 eblock.setSummaryVector(vec);
00168 }
00169
00170 getRouter()->transferTo(eid, routingbundle);
00171 }
00172
00173 void EpidemicRoutingExtension::run()
00174 {
00175 bool sendVectorUpdate = false;
00176
00177 while (true)
00178 {
00179 try {
00180 Task *t = _taskqueue.blockingpop();
00181
00182 try {
00183 ExpireTask &task = dynamic_cast<ExpireTask&>(*t);
00184
00185
00186 if (sendVectorUpdate)
00187 {
00188 sendVectorUpdate = false;
00189 _taskqueue.push( new BroadcastSummaryVectorTask() );
00190 }
00191
00192 ibrcommon::MutexLock l(_list_mutex);
00193 _seenlist.expire(task.timestamp);
00194 } catch (std::bad_cast) { };
00195
00196 try {
00197 SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
00198
00199 ibrcommon::MutexLock l(_list_mutex);
00200 ibrcommon::BloomFilter &bf = _neighbors.get(task.eid)._filter;
00201 dtn::data::Bundle b = getRouter()->getStorage().get(bf);
00202 getRouter()->transferTo(task.eid, b);
00203
00204 } catch (dtn::core::BundleStorage::NoBundleFoundException) {
00205 } catch (std::bad_cast) { };
00206
00207 try {
00208 dynamic_cast<BroadcastSummaryVectorTask&>(*t);
00209
00210 std::set<dtn::data::EID> list;
00211 {
00212 ibrcommon::MutexLock l(_list_mutex);
00213 list = _neighbors.getAvailable();
00214 }
00215
00216 for (std::set<dtn::data::EID>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00217 {
00218 transferEpidemicInformation(*iter);
00219 }
00220
00221 } catch (std::bad_cast) { };
00222
00223 try {
00224 UpdateSummaryVectorTask &task = dynamic_cast<UpdateSummaryVectorTask&>(*t);
00225 transferEpidemicInformation(task.eid);
00226 } catch (std::bad_cast) { };
00227
00228 try {
00229 ProcessBundleTask &task = dynamic_cast<ProcessBundleTask&>(*t);
00230 ibrcommon::MutexLock l(_list_mutex);
00231
00232
00233 if (task.bundle.destination == EID("dtn:epidemic-routing"))
00234 {
00235
00236 dtn::data::Bundle bundle = getRouter()->getStorage().get(task.bundle);
00237
00238
00239 getRouter()->getStorage().remove(bundle);
00240
00241 try {
00242
00243 const EpidemicExtensionBlock &ext = bundle.getBlock<EpidemicExtensionBlock>();
00244
00245
00246 const ibrcommon::BloomFilter &filter = ext.getSummaryVector().getBloomFilter();
00247
00248
00249 _neighbors.updateBundles(bundle._source, filter);
00250 } catch (dtn::data::Bundle::NoSuchBlockFoundException) {
00251
00252 }
00253 }
00254 else
00255 {
00256 sendVectorUpdate = true;
00257 }
00258 } catch (dtn::core::BundleStorage::NoBundleFoundException) {
00259
00260 } catch (std::bad_cast) { };
00261
00262 delete t;
00263 } catch (ibrcommon::Exception ex) {
00264 return;
00265 }
00266 }
00267 }
00268
00269 bool EpidemicRoutingExtension::wasSeenBefore(const dtn::data::BundleID &id) const
00270 {
00271 return ( std::find( _seenlist.begin(), _seenlist.end(), id ) != _seenlist.end());
00272 }
00273
00274 dtn::data::Block* EpidemicRoutingExtension::EpidemicExtensionBlock::Factory::create()
00275 {
00276 return new EpidemicRoutingExtension::EpidemicExtensionBlock();
00277 }
00278
00279 EpidemicRoutingExtension::EpidemicExtensionBlock::EpidemicExtensionBlock()
00280 : dtn::data::Block(EpidemicExtensionBlock::BLOCK_TYPE), _data("forwarded through epidemic routing")
00281 {
00282 }
00283
00284 EpidemicRoutingExtension::EpidemicExtensionBlock::~EpidemicExtensionBlock()
00285 {
00286 }
00287
00288 void EpidemicRoutingExtension::EpidemicExtensionBlock::setSummaryVector(const SummaryVector &vector)
00289 {
00290 _vector = vector;
00291 }
00292
00293 const SummaryVector& EpidemicRoutingExtension::EpidemicExtensionBlock::getSummaryVector() const
00294 {
00295 return _vector;
00296 }
00297
00298 void EpidemicRoutingExtension::EpidemicExtensionBlock::set(dtn::data::SDNV value)
00299 {
00300 _counter = value;
00301 }
00302
00303 dtn::data::SDNV EpidemicRoutingExtension::EpidemicExtensionBlock::get() const
00304 {
00305 return _counter;
00306 }
00307
00308 size_t EpidemicRoutingExtension::EpidemicExtensionBlock::getLength() const
00309 {
00310 return _counter.getLength() + _data.getLength() + _vector.getLength();
00311 }
00312
00313 std::istream& EpidemicRoutingExtension::EpidemicExtensionBlock::deserialize(std::istream &stream)
00314 {
00315 stream >> _counter;
00316 stream >> _data;
00317 stream >> _vector;
00318
00319
00320 dtn::data::Block::set(dtn::data::Block::FORWARDED_WITHOUT_PROCESSED, false);
00321
00322 return stream;
00323 }
00324
00325 std::ostream &EpidemicRoutingExtension::EpidemicExtensionBlock::serialize(std::ostream &stream) const
00326 {
00327 stream << _counter;
00328 stream << _data;
00329 stream << _vector;
00330
00331 return stream;
00332 }
00333
00334
00335
00336 EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
00337 : eid(e)
00338 { }
00339
00340 EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
00341 { }
00342
00343 std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
00344 {
00345 return "SearchNextBundleTask: " + eid.getString();
00346 }
00347
00348
00349
00350 EpidemicRoutingExtension::BroadcastSummaryVectorTask::BroadcastSummaryVectorTask()
00351 { }
00352
00353 EpidemicRoutingExtension::BroadcastSummaryVectorTask::~BroadcastSummaryVectorTask()
00354 { }
00355
00356 std::string EpidemicRoutingExtension::BroadcastSummaryVectorTask::toString()
00357 {
00358 return "BroadcastSummaryVectorTask";
00359 }
00360
00361
00362
00363 EpidemicRoutingExtension::UpdateSummaryVectorTask::UpdateSummaryVectorTask(const dtn::data::EID &e)
00364 : eid(e)
00365 { }
00366
00367 EpidemicRoutingExtension::UpdateSummaryVectorTask::~UpdateSummaryVectorTask()
00368 { }
00369
00370 std::string EpidemicRoutingExtension::UpdateSummaryVectorTask::toString()
00371 {
00372 return "UpdateSummaryVectorTask: " + eid.getString();
00373 }
00374
00375
00376
00377 EpidemicRoutingExtension::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &meta)
00378 : bundle(meta)
00379 { }
00380
00381 EpidemicRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
00382 { }
00383
00384 std::string EpidemicRoutingExtension::ProcessBundleTask::toString()
00385 {
00386 return "ProcessBundleTask: " + bundle.toString();
00387 }
00388
00389
00390
00391 EpidemicRoutingExtension::ExpireTask::ExpireTask(const size_t t)
00392 : timestamp(t)
00393 { }
00394
00395 EpidemicRoutingExtension::ExpireTask::~ExpireTask()
00396 { }
00397
00398 std::string EpidemicRoutingExtension::ExpireTask::toString()
00399 {
00400 return "ExpireTask";
00401 }
00402 }
00403 }