32 #include "ibrcommon/thread/MutexLock.h"
35 #include <ibrcommon/Logger.h>
51 const std::string NeighborRoutingExtension::TAG =
"NeighborRoutingExtension";
77 : _extension(e), _entry(entry), _plist(plist)
82 virtual dtn::data::Size limit()
const throw () {
return _entry.getFreeTransferSlots(); };
87 std::pair<bool, dtn::core::Node::Protocol> ret = _extension.shouldRouteTo(meta, _entry, _plist);
90 if (ret.first)
static_cast<RoutingResult&
>(result).put(meta, ret.second);
97 const std::string getWhere()
const throw ()
99 return "destination LIKE ?";
102 int bind(sqlite3_stmt *st,
int offset)
const throw ()
104 const std::string d = _entry.eid.getNode().getString() +
"%";
105 sqlite3_bind_text(st, offset, d.c_str(),
static_cast<int>(d.size()), SQLITE_TRANSIENT);
123 Task *t = _taskqueue.poll();
124 std::auto_ptr<Task> killer(t);
126 IBRCOMMON_LOGGER_DEBUG_TAG(NeighborRoutingExtension::TAG, 5) <<
"processing task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
134 SearchNextBundleTask &task =
dynamic_cast<SearchNextBundleTask&
>(*t);
142 ibrcommon::MutexLock l(db);
157 (**this).getSeeker().get(filter, list);
160 IBRCOMMON_LOGGER_DEBUG_TAG(NeighborRoutingExtension::TAG, 5) <<
"got " << list.size() <<
" items to transfer to " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
163 for (RoutingResult::const_iterator iter = list.begin(); iter != list.end(); ++iter)
167 transferTo(task.eid, (*iter).first, (*iter).second);
171 IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
173 IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
175 IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
177 IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
178 }
catch (
const std::bad_cast&) { };
184 const ProcessBundleTask &task =
dynamic_cast<ProcessBundleTask&
>(*t);
187 std::pair<bool, dtn::core::Node::Protocol> ret;
196 ibrcommon::MutexLock l(db);
199 ret = shouldRouteTo(task.bundle, entry, plist);
204 transferTo(task.nexthop, task.bundle, ret.second);
206 IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
208 IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
210 IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
212 IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
215 }
catch (
const std::bad_cast&) { };
216 }
catch (
const std::exception &ex) {
217 IBRCOMMON_LOGGER_DEBUG_TAG(NeighborRoutingExtension::TAG, 15) <<
"terminated due to " << ex.what() << IBRCOMMON_LOGGER_ENDL;
266 for (dtn::net::ConnectionManager::protocol_list::const_iterator it = plist.begin(); it != plist.end(); ++it)
279 return make_pair(
true, p);
289 _taskqueue.push(
new SearchNextBundleTask( peer ) );
297 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); ++iter)
303 _taskqueue.push(
new ProcessBundleTask(meta, peer, n.
getEID()) );
317 }
catch (
const ibrcommon::ThreadException &ex) {
318 IBRCOMMON_LOGGER_TAG(NeighborRoutingExtension::TAG, error) <<
"componentUp failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
329 }
catch (
const ibrcommon::ThreadException &ex) {
330 IBRCOMMON_LOGGER_TAG(NeighborRoutingExtension::TAG, error) <<
"componentDown failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
341 NeighborRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(
const dtn::data::EID &e)
345 NeighborRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
348 std::string NeighborRoutingExtension::SearchNextBundleTask::toString()
350 return "SearchNextBundleTask: " + eid.getString();
356 : bundle(meta), origin(o), nexthop(n)
359 NeighborRoutingExtension::ProcessBundleTask::~ProcessBundleTask()
362 std::string NeighborRoutingExtension::ProcessBundleTask::toString()
364 return "ProcessBundleTask: " + bundle.toString();
const std::set< dtn::core::Node > getNeighbors()
static dtn::data::EID local
bool sameHost(const std::string &other) const
std::list< dtn::core::Node::Protocol > protocol_list
bool isTransferThresholdReached() const
bool has(const dtn::data::BundleID &, const bool require_bloomfilter=false) const
NeighborDatabase::NeighborEntry & get(const dtn::data::EID &eid, bool noCached=false)
dtn::net::ConnectionManager & getConnectionManager()
BundleFilter::ACTION evaluate(BundleFilter::TABLE table, const FilterContext &context) const
NeighborRoutingExtension()
void setProtocol(const dtn::core::Node::Protocol &protocol)
void setPeer(const dtn::data::EID &endpoint)
void transferTo(const dtn::data::EID &destination, const dtn::data::MetaBundle &meta, const dtn::core::Node::Protocol)
void setMetaBundle(const dtn::data::MetaBundle &data)
const protocol_set getSupportedProtocols()
virtual const std::string getTag() const
virtual ~NeighborRoutingExtension()
const dtn::data::EID & getEID() const
void setRouting(const dtn::routing::RoutingExtension &routing)
virtual void eventBundleQueued(const dtn::data::EID &peer, const dtn::data::MetaBundle &meta)
virtual void eventDataChanged(const dtn::data::EID &peer)
static BundleCore & getInstance()