00001
00002
00003
00004
00005
00006
00007
00008 #include "routing/NeighborRoutingExtension.h"
00009 #include "routing/QueueBundleEvent.h"
00010 #include "net/TransferCompletedEvent.h"
00011 #include "core/BundleExpiredEvent.h"
00012 #include "core/NodeEvent.h"
00013 #include "core/Node.h"
00014 #include "net/ConnectionManager.h"
00015 #include "ibrcommon/thread/MutexLock.h"
00016 #include "core/SimpleBundleStorage.h"
00017
00018 #include <functional>
00019 #include <list>
00020 #include <algorithm>
00021 #include <typeinfo>
00022
00023 namespace dtn
00024 {
00025 namespace routing
00026 {
00027 struct FindNode: public std::binary_function< dtn::core::Node, dtn::core::Node, bool > {
00028 bool operator () ( const dtn::core::Node &n1, const dtn::core::Node &n2 ) const {
00029 return n1 == n2;
00030 }
00031 };
00032
00033 NeighborRoutingExtension::NeighborRoutingExtension()
00034 : _running(true)
00035 {
00036 try {
00037
00038 dtn::core::SimpleBundleStorage &storage = dynamic_cast<dtn::core::SimpleBundleStorage&>(getRouter()->getStorage());
00039
00040 std::list<dtn::data::BundleID> list = storage.getList();
00041
00042 for (std::list<dtn::data::BundleID>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00043 {
00044 try {
00045 dtn::data::MetaBundle meta( storage.get(*iter) );
00046
00047
00048 route( meta );
00049 } catch (dtn::core::BundleStorage::NoBundleFoundException) {
00050
00051 }
00052 }
00053 } catch (std::bad_cast ex) {
00054
00055 }
00056 }
00057
00058 NeighborRoutingExtension::~NeighborRoutingExtension()
00059 {
00060 stopExtension();
00061 join();
00062 }
00063
00064 void NeighborRoutingExtension::stopExtension()
00065 {
00066 ibrcommon::MutexLock l(_wait);
00067 _running = false;
00068 _wait.signal(true);
00069 }
00070
00071 void NeighborRoutingExtension::run()
00072 {
00073 ibrcommon::Mutex l(_wait);
00074
00075 while (_running)
00076 {
00077 while (!_available.empty())
00078 {
00079 EID eid = _available.front();
00080 _available.pop();
00081
00082 if ( _stored_bundles.find(eid) != _stored_bundles.end() )
00083 {
00084 std::queue<dtn::data::BundleID> &bundlequeue = _stored_bundles[eid];
00085
00086 while (!bundlequeue.empty())
00087 {
00088 dtn::data::BundleID id = bundlequeue.front();
00089 bundlequeue.pop();
00090
00091 if ( isNeighbor(eid) )
00092 {
00093 try {
00094 getRouter()->transferTo(eid, id);
00095 } catch (BaseRouter::NoRouteFoundException ex) {
00096 bundlequeue.push(id);
00097 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) {
00098
00099 }
00100 }
00101 else break;
00102 }
00103 }
00104 }
00105
00106 yield();
00107 _wait.wait();
00108 }
00109 }
00110
00111 void NeighborRoutingExtension::notify(const dtn::core::Event *evt)
00112 {
00113 const QueueBundleEvent *queued = dynamic_cast<const QueueBundleEvent*>(evt);
00114 const dtn::core::NodeEvent *nodeevent = dynamic_cast<const dtn::core::NodeEvent*>(evt);
00115 const dtn::net::TransferCompletedEvent *completed = dynamic_cast<const dtn::net::TransferCompletedEvent*>(evt);
00116 const dtn::core::BundleExpiredEvent *expired = dynamic_cast<const dtn::core::BundleExpiredEvent*>(evt);
00117
00118 if (queued != NULL)
00119 {
00120
00121 route(queued->bundle);
00122 }
00123 else if (nodeevent != NULL)
00124 {
00125 dtn::core::Node n = nodeevent->getNode();
00126
00127 switch (nodeevent->getAction())
00128 {
00129 case NODE_AVAILABLE:
00130 {
00131 std::list<dtn::core::Node>::iterator iter = std::find_if(_neighbors.begin(), _neighbors.end(), std::bind2nd( FindNode(), n ) );
00132 if (iter == _neighbors.end())
00133 {
00134 _neighbors.push_back(nodeevent->getNode());
00135 }
00136
00137 ibrcommon::MutexLock l(_wait);
00138 dtn::data::EID eid(n.getURI());
00139 _available.push(eid);
00140 _wait.signal(true);
00141 }
00142 break;
00143
00144 case NODE_UNAVAILABLE:
00145 {
00146 std::list<dtn::core::Node>::iterator iter = std::find_if(_neighbors.begin(), _neighbors.end(), std::bind2nd( FindNode(), n ) );
00147 if (iter != _neighbors.end())
00148 {
00149 _neighbors.erase(iter);
00150 }
00151 }
00152 break;
00153
00154 default:
00155 break;
00156 }
00157 }
00158 else if (expired != NULL)
00159 {
00160 remove(expired->_bundle);
00161 }
00162 else if (completed != NULL)
00163 {
00164 dtn::data::EID eid = completed->getPeer();
00165
00166 if ( _stored_bundles.find(eid) != _stored_bundles.end() )
00167 {
00168
00169 remove(completed->getBundle());
00170 }
00171 }
00172 }
00173
00174 void NeighborRoutingExtension::route(const dtn::data::MetaBundle &meta)
00175 {
00176 try {
00177
00178 dtn::data::EID dest = meta.destination.getNodeEID();
00179
00180 if ( isNeighbor( dest ) )
00181 {
00182 getRouter()->transferTo( dest, meta );
00183 return;
00184 }
00185
00186
00187 std::queue<dtn::data::BundleID> &q = _stored_bundles[dest];
00188
00189
00190 q.push( meta );
00191
00192 } catch (dtn::net::ConnectionNotAvailableException ex) {
00193
00194
00195 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) {
00196
00197 }
00198 }
00199
00200 bool NeighborRoutingExtension::isNeighbor(const dtn::data::EID &eid) const
00201 {
00202 std::list<dtn::core::Node>::const_iterator iter = _neighbors.begin();
00203
00204 while (iter != _neighbors.end())
00205 {
00206 if ( eid.sameHost((*iter).getURI()) )
00207 {
00208 return true;
00209 }
00210
00211 iter++;
00212 }
00213
00214 return false;
00215 }
00216
00217 void NeighborRoutingExtension::remove(const dtn::data::BundleID &id)
00218 {
00219 for (std::map<dtn::data::EID, std::queue<dtn::data::BundleID> >::iterator iter = _stored_bundles.begin(); iter != _stored_bundles.end(); iter++ )
00220 {
00221 std::queue<dtn::data::BundleID> &q = (*iter).second;
00222 std::queue<dtn::data::BundleID> new_queue;
00223
00224 while (!q.empty())
00225 {
00226 dtn::data::BundleID bid = q.front();
00227
00228 if (bid != id)
00229 {
00230 new_queue.push(bid);
00231 }
00232
00233 q.pop();
00234 }
00235
00236 _stored_bundles[(*iter).first] = new_queue;
00237 }
00238 }
00239 }
00240 }