34 #include <ibrcommon/Logger.h>
35 #include <ibrcommon/thread/ThreadsafeReference.h>
45 const std::string ProphetRoutingExtension::TAG =
"ProphetRoutingExtension";
48 float beta,
float gamma,
float delta, ibrcommon::Timer::time_t time_unit, ibrcommon::Timer::time_t i_typ,
50 : _deliveryPredictabilityMap(time_unit, beta, gamma),
51 _forwardingStrategy(strategy), _next_exchange_timeout(next_exchange_timeout), _next_exchange_timestamp(0),
52 _p_encounter_max(p_encounter_max), _p_encounter_first(p_encounter_first),
53 _p_first_threshold(p_first_threshold), _delta(delta), _i_typ(i_typ)
69 if (!routing_d.isDirectory()) ibrcommon::File::createDirectory(routing_d);
72 _persistent_file = routing_d.get(
"prophet.dat");
78 IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, info) <<
"Initializing PRoPHET routing module" << IBRCOMMON_LOGGER_ENDL;
85 delete _forwardingStrategy;
101 ibrcommon::MutexLock l(_deliveryPredictabilityMap);
107 ibrcommon::MutexLock l(_acknowledgementSet);
123 IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) <<
"delivery predictability map received from " << neighbor_node.
getString() << IBRCOMMON_LOGGER_ENDL;
130 ibrcommon::MutexLock l(db);
135 updateNeighbor(neighbor_node, neighbor_dp_map);
136 }
catch (std::exception&) { }
141 IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) <<
"ack'set received from " << neighbor.
getString() << IBRCOMMON_LOGGER_ENDL;
145 (**this).setKnown(*it);
150 ibrcommon::MutexLock l(_acknowledgementSet);
151 _acknowledgementSet.
merge(neighbor_ack_set);
161 : _ackset(neighbor_ack_set)
174 if(!_ackset.has(meta))
182 } filter(neighbor_ack_set);
185 storage.
get(filter, removeList);
187 for (std::list<dtn::data::MetaBundle>::const_iterator it = removeList.begin(); it != removeList.end(); ++it)
194 IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 10) <<
"Bundle removed due to prophet ack: " << meta.
toString() << IBRCOMMON_LOGGER_ENDL;
198 IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, warning) << neighbor.
getString() <<
" requested to purge a bundle with a non-singleton destination: " << meta.
toString() << IBRCOMMON_LOGGER_ENDL;
205 }
catch (std::exception&) { }
211 _taskqueue.push(
new SearchNextBundleTask( peer ) );
220 }
catch (
const std::bad_cast &ex) { };
228 for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); ++iter)
235 eventDataChanged(n.
getEID());
244 ibrcommon::MutexLock l(_acknowledgementSet);
245 _acknowledgementSet.expire(time.getTimestamp());
248 if ((time.getTimestamp().get<
size_t>() % 60) == 0)
251 if (_persistent_file.isValid()) store(_persistent_file);
254 ibrcommon::MutexLock l(_next_exchange_mutex);
257 if ((_next_exchange_timestamp > 0) && (_next_exchange_timestamp < now))
259 _taskqueue.push(
new NextExchangeTask() );
262 _next_exchange_timestamp = now + _next_exchange_timeout;
271 _taskqueue.push(
new SearchNextBundleTask( handshake.peer ) );
280 ibrcommon::MutexLock l(_acknowledgementSet);
281 _acknowledgementSet.add(purge.bundle);
295 if (_persistent_file.exists()) restore(_persistent_file);
301 }
catch (
const ibrcommon::ThreadException &ex) {
302 IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, error) <<
"componentUp failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
313 if (_persistent_file.isValid()) store(_persistent_file);
319 }
catch (
const ibrcommon::ThreadException &ex) {
320 IBRCOMMON_LOGGER_TAG(ProphetRoutingExtension::TAG, error) <<
"componentDown failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
332 ibrcommon::MutexLock l(_deliveryPredictabilityMap);
335 return ibrcommon::ThreadsafeReference<DeliveryPredictabilityMap>(_deliveryPredictabilityMap, _deliveryPredictabilityMap);
340 return ibrcommon::ThreadsafeReference<const DeliveryPredictabilityMap>(_deliveryPredictabilityMap,
const_cast<DeliveryPredictabilityMap&
>(_deliveryPredictabilityMap));
345 return ibrcommon::ThreadsafeReference<const AcknowledgementSet>(_acknowledgementSet,
const_cast<AcknowledgementSet&
>(_acknowledgementSet));
348 void ProphetRoutingExtension::ProphetRoutingExtension::run() throw ()
354 : _entry(entry), _strategy(strategy), _dpm(dpm), _neighbors(neighbors), _plist(plist), _context(context)
359 virtual dtn::data::Size limit()
const throw () {
return _entry.getFreeTransferSlots(); };
364 if (meta.hopcount == 0)
385 if (_entry.eid == meta.destination.getNode())
396 if (_neighbors.find(n) != _neighbors.end())
411 if (_dpm.get(meta.source.getNode()) <= 0.0)
return false;
420 if (_entry.has(meta,
true))
431 if (!_strategy.shallForward(_dpm, meta))
return false;
439 for (dtn::net::ConnectionManager::protocol_list::const_iterator it = _plist.begin(); it != _plist.end(); ++it)
452 static_cast<RoutingResult&
>(result).put(meta, p);
461 const NeighborDatabase::NeighborEntry &_entry;
462 const ForwardingStrategy &_strategy;
463 const DeliveryPredictabilityMap &_dpm;
464 const std::set<dtn::core::Node> &_neighbors;
473 std::set<dtn::core::Node> neighbors;
478 Task *t = _taskqueue.poll();
479 std::auto_ptr<Task> killer(t);
481 IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 50) <<
"processing task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
490 SearchNextBundleTask &task =
dynamic_cast<SearchNextBundleTask&
>(*t);
497 NeighborDatabase &db = (**this).getNeighborDB();
499 ibrcommon::MutexLock l(db);
500 NeighborDatabase::NeighborEntry &entry = db.get(task.eid,
true);
503 if (!entry.isTransferThresholdReached())
504 throw NeighborDatabase::NoMoreTransfersAvailable();
507 const DeliveryPredictabilityMap &dpm = entry.getDataset<DeliveryPredictabilityMap>();
527 const BundleFilter filter(entry, *_forwardingStrategy, dpm, neighbors, context, plist);
530 IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 40) <<
"search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
533 (**this).getSeeker().get(filter, list);
534 }
catch (
const NeighborDatabase::DatasetNotAvailableException&) {
537 (**this).doHandshake(task.eid);
540 (**this).doHandshake(task.eid);
544 for (RoutingResult::const_iterator iter = list.begin(); iter != list.end(); ++iter)
547 transferTo(task.eid, (*iter).first, (*iter).second);
548 }
catch (
const NeighborDatabase::AlreadyInTransitException&) { };
550 }
catch (
const NeighborDatabase::NoMoreTransfersAvailable &ex) {
551 IBRCOMMON_LOGGER_DEBUG_TAG(
TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
552 }
catch (
const NeighborDatabase::EntryNotFoundException &ex) {
553 IBRCOMMON_LOGGER_DEBUG_TAG(
TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
555 IBRCOMMON_LOGGER_DEBUG_TAG(
TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
557 IBRCOMMON_LOGGER_DEBUG_TAG(
TAG, 10) <<
"task " << t->toString() <<
" aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
558 }
catch (
const std::bad_cast&) { }
565 dynamic_cast<NextExchangeTask&
>(*t);
568 std::set<dtn::core::Node>::const_iterator it;
569 for(it = neighbors.begin(); it != neighbors.end(); ++it)
572 (**this).doHandshake(it->getEID());
573 }
catch (
const ibrcommon::Exception &ex) { }
575 }
catch (
const std::bad_cast&) { }
577 }
catch (
const ibrcommon::Exception &ex) {
578 IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 20) <<
"task failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
580 }
catch (
const std::exception &ex) {
581 IBRCOMMON_LOGGER_DEBUG_TAG(ProphetRoutingExtension::TAG, 15) <<
"terminated due to " << ex.what() << IBRCOMMON_LOGGER_ENDL;
594 float ProphetRoutingExtension::p_encounter(
const dtn::data::EID &neighbor)
const
596 age_map::const_iterator it = _ageMap.find(neighbor);
597 if(it == _ageMap.end())
600 return _p_encounter_max;
605 #ifdef __DEVELOPMENT_ASSERTIONS__
606 assert(currentTime >= it->second &&
"the ageMap timestamp should be smaller than the current timestamp");
608 if(time_diff > _i_typ)
610 return _p_encounter_max;
614 return _p_encounter_max * time_diff.
get<
float>() / static_cast<float>(_i_typ);
618 void ProphetRoutingExtension::updateNeighbor(
const dtn::data::EID &neighbor,
const DeliveryPredictabilityMap& neighbor_dp_map)
621 ibrcommon::MutexLock l(_deliveryPredictabilityMap);
624 const size_t numOfItems = _deliveryPredictabilityMap.
size();
633 float neighbor_dp = _deliveryPredictabilityMap.
get(neighbor);
635 if (neighbor_dp < _p_first_threshold)
637 neighbor_dp = _p_encounter_first;
641 neighbor_dp += (1 - _delta - neighbor_dp) * p_encounter(neighbor);
644 _deliveryPredictabilityMap.
set(neighbor, neighbor_dp);
645 }
catch (
const DeliveryPredictabilityMap::ValueNotFoundException&) {
646 _deliveryPredictabilityMap.
set(neighbor, _p_encounter_first);
652 _deliveryPredictabilityMap.
update(neighbor, neighbor_dp_map, _p_encounter_first);
655 if (numOfItems < _deliveryPredictabilityMap.
size())
662 void ProphetRoutingExtension::age()
664 _deliveryPredictabilityMap.
age(_p_first_threshold);
670 void ProphetRoutingExtension::store(
const ibrcommon::File &target)
673 std::ofstream output(target.getPath().c_str());
676 if (!output.good())
return;
679 ibrcommon::MutexLock l(_deliveryPredictabilityMap);
682 _deliveryPredictabilityMap.
store(output);
686 ibrcommon::MutexLock l(_acknowledgementSet);
687 output << _acknowledgementSet;
697 for (age_map::const_iterator it = _ageMap.begin(); it != _ageMap.end(); ++it)
708 output << peer_entry << ts;
715 void ProphetRoutingExtension::restore(
const ibrcommon::File &source)
718 std::ifstream input(source.getPath().c_str());
721 if (!input.good())
return;
724 ibrcommon::MutexLock l(_deliveryPredictabilityMap);
727 _deliveryPredictabilityMap.
restore(input);
731 ibrcommon::MutexLock l(_acknowledgementSet);
732 input >> _acknowledgementSet;
740 input >> num_entries;
747 while (input.good() && num_entries > 0)
752 input >> peer_entry >> ts;
755 if (monotonic_now >= (ts - monotonic_diff))
773 ProphetRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(
const dtn::data::EID &eid)
778 ProphetRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
782 std::string ProphetRoutingExtension::SearchNextBundleTask::toString()
const
784 return "SearchNextBundleTask: " + eid.
getString();
787 ProphetRoutingExtension::NextExchangeTask::NextExchangeTask()
791 ProphetRoutingExtension::NextExchangeTask::~NextExchangeTask()
795 std::string ProphetRoutingExtension::NextExchangeTask::toString()
const
797 return "NextExchangeTask";
810 return neighborDPIsGreater(neighbor_dpm, bundle.
destination);
824 nf_map::iterator nf_it = _NF_map.find(
id);
826 if (nf_it == _NF_map.end()) {
827 nf_it = _NF_map.insert(std::make_pair(
id, 0)).first;
837 nf_map::const_iterator nf_it = _NF_map.find(bundle);
838 if(nf_it != _NF_map.end()) {
842 if (NF > _NF_max)
return false;
844 return neighborDPIsGreater(neighbor_dpm, bundle.
destination);
static Configuration & getInstance(bool reset=false)
std::string toString() const
void update(const dtn::data::EID &origin, const DeliveryPredictabilityMap &dpm, const float &p_encounter_first)
const std::set< dtn::core::Node > getNeighbors()
void store(std::ostream &output) const
virtual void eventDataChanged(const dtn::data::EID &peer)
static const dtn::data::Number identifier
static dtn::data::EID local
GTMX_Strategy(unsigned int NF_max)
virtual void componentDown()
static void add(EventReceiver< E > *receiver)
bool sameHost(const std::string &other) const
virtual void processHandshake(const dtn::data::EID &, NodeHandshake &)
std::list< dtn::core::Node::Protocol > protocol_list
void addItem(NodeHandshakeItem *item)
const_iterator begin() const
void set(const dtn::data::EID &neighbor, float value)
This class keeps track of the predictablities to see a specific EID.
virtual void eventTransferCompleted(const dtn::data::EID &peer, const dtn::data::MetaBundle &meta)
static void remove(const EventReceiver< E > *receiver)
virtual ~ProphetRoutingExtension()
NeighborDatabase::NeighborEntry & get(const dtn::data::EID &eid, bool noCached=false)
dtn::net::ConnectionManager & getConnectionManager()
static const dtn::data::Number identifier
bool hasRequest(const dtn::data::Number &identifier) const
BundleFilter::ACTION evaluate(BundleFilter::TABLE table, const FilterContext &context) const
void setProtocol(const dtn::core::Node::Protocol &protocol)
This class is a abstract base class for all prophet forwarding strategies.
const Configuration::Network & getNetwork() const
void setPeer(const dtn::data::EID &endpoint)
ibrcommon::File getPath(string name) const
virtual bool shallForward(const DeliveryPredictabilityMap &neighbor_dpm, const dtn::data::MetaBundle &bundle) const
Set of Acknowledgements, that can be serialized in node handshakes.
virtual void eventBundleQueued(const dtn::data::EID &peer, const dtn::data::MetaBundle &meta)
ibrcommon::ThreadsafeReference< const AcknowledgementSet > getAcknowledgementSet() const
void setMetaBundle(const dtn::data::MetaBundle &data)
virtual void componentUp()
static const dtn::data::Number identifier
The GTMX forwarding strategy. Using this strategy, packets are forwarding, if the neighbor has a high...
ProphetRoutingExtension(ForwardingStrategy *strategy, float p_encounter_max, float p_encounter_first, float p_first_threshold, float beta, float gamma, float delta, size_t time_unit, size_t i_typ, dtn::data::Timestamp next_exchange_timeout)
const protocol_set getSupportedProtocols()
static dtn::data::Timestamp getTime()
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
virtual void responseHandshake(const dtn::data::EID &, const NodeHandshake &, NodeHandshake &)
void putDataset(NeighborDataset &dset)
virtual void requestHandshake(const dtn::data::EID &, NodeHandshake &) const
virtual const std::string getTag() const
virtual void raiseEvent(const dtn::routing::NodeHandshakeEvent &evt)
ibrcommon::ThreadsafeReference< DeliveryPredictabilityMap > getDeliveryPredictabilityMap()
void setProphetRouter(ProphetRoutingExtension *router)
std::string getString() const
void merge(const AcknowledgementSet &)
void restore(std::istream &input)
void addForward(const dtn::data::BundleID &id)
void age(const float &p_first_threshold)
dtn::data::BundleList::const_iterator const_iterator
static void raise(const dtn::data::MetaBundle &bundle, EventBundleAction action, dtn::data::StatusReportBlock::REASON_CODE reason=dtn::data::StatusReportBlock::NO_ADDITIONAL_INFORMATION)
const dtn::data::EID & getEID() const
static void raise(const dtn::data::MetaBundle &meta, REASON_CODE reason=DELIVERED)
void addRequest(const dtn::data::Number &identifier)
static dtn::data::Timestamp getMonotonicTimestamp()
dtn::data::SDNV< Size > Number
float get(const dtn::data::EID &neighbor) const
void setRouting(const dtn::routing::RoutingExtension &routing)
virtual bool shallForward(const DeliveryPredictabilityMap &neighbor_dpm, const dtn::data::MetaBundle &bundle) const
const_iterator end() const
bool doPreferDirect() const
static BundleCore & getInstance()