Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "routing/NeighborRoutingExtension.h"
00009 #include "routing/QueueBundleEvent.h"
00010 #include "net/TransferCompletedEvent.h"
00011 #include "net/TransferAbortedEvent.h"
00012 #include "core/BundleExpiredEvent.h"
00013 #include "core/NodeEvent.h"
00014 #include "core/Node.h"
00015 #include "net/ConnectionManager.h"
00016 #include "ibrcommon/thread/MutexLock.h"
00017 #include "core/SimpleBundleStorage.h"
00018 #include <ibrcommon/Logger.h>
00019
00020 #include <functional>
00021 #include <list>
00022 #include <algorithm>
00023 #include <typeinfo>
00024
00025 namespace dtn
00026 {
00027 namespace routing
00028 {
00029 struct FindNode: public std::binary_function< dtn::core::Node, dtn::core::Node, bool > {
00030 bool operator () ( const dtn::core::Node &n1, const dtn::core::Node &n2 ) const {
00031 return n1 == n2;
00032 }
00033 };
00034
00035 NeighborRoutingExtension::NeighborRoutingExtension()
00036 {
00037 try {
00038
00039 dtn::core::SimpleBundleStorage &storage = dynamic_cast<dtn::core::SimpleBundleStorage&>(getRouter()->getStorage());
00040
00041 std::list<dtn::data::BundleID> list = storage.getList();
00042
00043 for (std::list<dtn::data::BundleID>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00044 {
00045 try {
00046 dtn::data::MetaBundle meta( storage.get(*iter) );
00047
00048
00049 route( meta );
00050 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00051
00052 }
00053 }
00054 } catch (std::bad_cast ex) {
00055
00056 }
00057 }
00058
00059 NeighborRoutingExtension::~NeighborRoutingExtension()
00060 {
00061 stop();
00062 join();
00063 }
00064
00065 void NeighborRoutingExtension::stopExtension()
00066 {
00067 _available.abort();
00068 }
00069
00070 bool NeighborRoutingExtension::__cancellation()
00071 {
00072 _available.abort();
00073 return true;
00074 }
00075
00076 void NeighborRoutingExtension::run()
00077 {
00078 while (true)
00079 {
00080 EID eid = _available.getnpop(true);
00081
00082 ibrcommon::MutexLock l(_stored_bundles_lock);
00083 if ( _stored_bundles.find(eid) != _stored_bundles.end() )
00084 {
00085 std::queue<dtn::data::BundleID> &bundlequeue = _stored_bundles[eid];
00086
00087 while (!bundlequeue.empty())
00088 {
00089 dtn::data::BundleID id = bundlequeue.front();
00090 bundlequeue.pop();
00091
00092 if ( isNeighbor(eid) )
00093 {
00094 try {
00095 getRouter()->transferTo(eid, id);
00096 } catch (BaseRouter::NoRouteFoundException ex) {
00097 bundlequeue.push(id);
00098 }
00099 }
00100 else break;
00101 }
00102 }
00103
00104 yield();
00105 }
00106 }
00107
00108 void NeighborRoutingExtension::notify(const dtn::core::Event *evt)
00109 {
00110 try {
00111 const QueueBundleEvent &queued = dynamic_cast<const QueueBundleEvent&>(*evt);
00112
00113
00114 route(queued.bundle);
00115
00116 return;
00117 } catch (std::bad_cast ex) { };
00118
00119 try {
00120 const dtn::net::TransferCompletedEvent &completed = dynamic_cast<const dtn::net::TransferCompletedEvent&>(*evt);
00121
00122 dtn::data::EID eid = completed.getPeer();
00123
00124 ibrcommon::MutexLock l(_stored_bundles_lock);
00125 if ( _stored_bundles.find(eid) != _stored_bundles.end() )
00126 {
00127
00128 remove(completed.getBundle());
00129 }
00130
00131 return;
00132 } catch (std::bad_cast ex) { };
00133
00134 try {
00135 const dtn::net::TransferAbortedEvent &aborted = dynamic_cast<const dtn::net::TransferAbortedEvent&>(*evt);
00136 dtn::data::EID eid = aborted.getPeer();
00137 dtn::data::BundleID id = aborted.getBundleID();
00138
00139 switch (aborted.reason)
00140 {
00141 default:
00142 case dtn::net::TransferAbortedEvent::REASON_UNDEFINED:
00143 case dtn::net::TransferAbortedEvent::REASON_RETRY_LIMIT_REACHED:
00144 case dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN:
00145 {
00146 ibrcommon::MutexLock l(_stored_bundles_lock);
00147
00148 std::queue<dtn::data::BundleID> &q = _stored_bundles[eid];
00149 q.push(id);
00150
00151 break;
00152 }
00153
00154 case dtn::net::TransferAbortedEvent::REASON_REFUSED:
00155 case dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED:
00156 {
00157 ibrcommon::MutexLock l(_stored_bundles_lock);
00158 if ( _stored_bundles.find(eid) != _stored_bundles.end() )
00159 {
00160
00161 remove(id);
00162 }
00163
00164 break;
00165 }
00166 }
00167
00168 return;
00169 } catch (std::bad_cast ex) { };
00170
00171 try {
00172 const dtn::core::NodeEvent &nodeevent = dynamic_cast<const dtn::core::NodeEvent&>(*evt);
00173
00174 dtn::core::Node n = nodeevent.getNode();
00175
00176 switch (nodeevent.getAction())
00177 {
00178 case NODE_AVAILABLE:
00179 {
00180 std::list<dtn::core::Node>::iterator iter = std::find_if(_neighbors.begin(), _neighbors.end(), std::bind2nd( FindNode(), n ) );
00181 if (iter == _neighbors.end())
00182 {
00183 _neighbors.push_back(nodeevent.getNode());
00184 }
00185
00186 dtn::data::EID eid(n.getURI());
00187 _available.push(eid);
00188 }
00189 break;
00190
00191 case NODE_UNAVAILABLE:
00192 {
00193 std::list<dtn::core::Node>::iterator iter = std::find_if(_neighbors.begin(), _neighbors.end(), std::bind2nd( FindNode(), n ) );
00194 if (iter != _neighbors.end())
00195 {
00196 _neighbors.erase(iter);
00197 }
00198 }
00199 break;
00200
00201 default:
00202 break;
00203 }
00204 return;
00205 } catch (std::bad_cast ex) { };
00206
00207 try {
00208 const dtn::core::BundleExpiredEvent &expired = dynamic_cast<const dtn::core::BundleExpiredEvent&>(*evt);
00209
00210 ibrcommon::MutexLock l(_stored_bundles_lock);
00211 remove(expired._bundle);
00212
00213 return;
00214 } catch (std::bad_cast ex) { };
00215 }
00216
00217 void NeighborRoutingExtension::route(const dtn::data::MetaBundle &meta)
00218 {
00219 try {
00220
00221 dtn::data::EID dest = meta.destination.getNodeEID();
00222
00223 if ( isNeighbor( dest ) )
00224 {
00225 getRouter()->transferTo( dest, meta );
00226 return;
00227 }
00228
00229
00230 std::queue<dtn::data::BundleID> &q = _stored_bundles[dest];
00231
00232
00233 q.push( meta );
00234
00235 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00236
00237 IBRCOMMON_LOGGER(warning) << "bundle forward aborted: connection to host not available" << IBRCOMMON_LOGGER_ENDL;
00238 }
00239 }
00240
00241 bool NeighborRoutingExtension::isNeighbor(const dtn::data::EID &eid) const
00242 {
00243 std::list<dtn::core::Node>::const_iterator iter = _neighbors.begin();
00244
00245 while (iter != _neighbors.end())
00246 {
00247 if ( eid.sameHost((*iter).getURI()) )
00248 {
00249 return true;
00250 }
00251
00252 iter++;
00253 }
00254
00255 return false;
00256 }
00257
00258 void NeighborRoutingExtension::remove(const dtn::data::BundleID &id)
00259 {
00260 for (std::map<dtn::data::EID, std::queue<dtn::data::BundleID> >::iterator iter = _stored_bundles.begin(); iter != _stored_bundles.end(); iter++ )
00261 {
00262 std::queue<dtn::data::BundleID> &q = (*iter).second;
00263 std::queue<dtn::data::BundleID> new_queue;
00264
00265 while (!q.empty())
00266 {
00267 dtn::data::BundleID bid = q.front();
00268
00269 if (bid != id)
00270 {
00271 new_queue.push(bid);
00272 }
00273
00274 q.pop();
00275 }
00276
00277 _stored_bundles[(*iter).first] = new_queue;
00278 }
00279 }
00280 }
00281 }