34 #include <ibrcommon/thread/MutexLock.h>
35 #include <ibrcommon/thread/RWLock.h>
36 #include <ibrcommon/Logger.h>
42 const std::string NodeHandshakeExtension::TAG =
"NodeHandshakeExtension";
43 const dtn::data::EID NodeHandshakeExtension::BROADCAST_ENDPOINT(
"dtn://broadcast.dtn/routing");
91 IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) <<
"summary vector received from " << source.
getString() << IBRCOMMON_LOGGER_ENDL;
102 ibrcommon::MutexLock l(db);
104 }
catch (std::exception&) { };
109 IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) <<
"purge vector received from " << source.
getString() << IBRCOMMON_LOGGER_ENDL;
141 return meta.
isIn(_filter);
144 const ibrcommon::BloomFilter &_filter;
145 } bundle_filter(purge);
155 storage.
get(bundle_filter, list);
157 for (dtn::storage::BundleResultList::const_iterator iter = list.begin(); iter != list.end(); ++iter)
165 IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) <<
"bundle purged: " << meta.
toString() << IBRCOMMON_LOGGER_ENDL;
171 (**this).setPurged(meta);
173 }
while (!list.empty());
174 }
catch (std::exception&) { };
179 _endpoint.query(eid);
206 _endpoint.removeFromBlacklist(n.
getEID());
211 : _callback(callback)
213 AbstractWorker::initialize(
"routing");
214 AbstractWorker::subscribe(BROADCAST_ENDPOINT);
217 NodeHandshakeExtension::HandshakeEndpoint::~HandshakeEndpoint()
221 void NodeHandshakeExtension::HandshakeEndpoint::callbackBundleReceived(
const Bundle &b)
226 _callback.processHandshake(b);
234 void NodeHandshakeExtension::HandshakeEndpoint::removeFromBlacklist(
const dtn::data::EID &eid)
236 ibrcommon::MutexLock l(_blacklist_lock);
237 _blacklist.erase(eid);
244 notification.addRequest(
id);
246 IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) <<
"handshake notification for type: " <<
id << IBRCOMMON_LOGGER_ENDL;
252 req.source = getWorkerURI();
256 req.destination = BROADCAST_ENDPOINT;
266 ibrcommon::BLOB::Reference ref = p.
getBLOB();
270 ibrcommon::BLOB::iostream ios = ref.iostream();
271 (*ios) << notification;
282 void NodeHandshakeExtension::HandshakeEndpoint::query(
const dtn::data::EID &origin)
285 ibrcommon::MutexLock l(_blacklist_lock);
294 #ifdef IBRDTN_SUPPORT_COMPRESSION
300 (*_callback).requestHandshake(origin, request);
302 IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) <<
"handshake query from " << origin.
getString() <<
": " << request.toString() << IBRCOMMON_LOGGER_ENDL;
308 req.source = getWorkerURI();
312 req.destination = origin;
325 ibrcommon::BLOB::Reference ref = p.
getBLOB();
329 ibrcommon::BLOB::iostream ios = ref.iostream();
345 ibrcommon::BLOB::Reference ref = p.
getBLOB();
350 ibrcommon::BLOB::iostream s = ref.iostream();
354 IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) <<
"handshake received from " << bundle.
source.
getString() <<
": " << handshake.
toString() << IBRCOMMON_LOGGER_ENDL;
363 (**this).responseHandshake(bundle.
source, handshake, response);
365 IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) <<
"handshake reply to " << bundle.
source.
getString() <<
": " << response.
toString() << IBRCOMMON_LOGGER_ENDL;
371 answer.
source = _endpoint.getWorkerURI();
385 ibrcommon::BLOB::Reference ref = p.
getBLOB();
389 ibrcommon::BLOB::iostream ios = ref.iostream();
401 _endpoint.send(answer);
409 (**this).processHandshake(bundle.
source, handshake);
420 (**this).requestHandshake(BROADCAST_ENDPOINT, request);
423 for (NodeHandshake::request_set::const_iterator it = rs.begin(); it != rs.end(); ++it)
430 _endpoint.removeFromBlacklist(node);
433 _endpoint.query(node);
std::string toString() const
const std::string toString() const
bool isIn(const ibrcommon::BloomFilter &bf) const
static dtn::data::EID local
static void raiseEvent(HANDSHAKE_STATE state, const dtn::data::EID &peer)
static void add(EventReceiver< E > *receiver)
void setApplication(const dtn::data::Number &app)
void addItem(NodeHandshakeItem *item)
void requestHandshake(const dtn::data::EID &destination, NodeHandshake &request) const
static void remove(const EventReceiver< E > *receiver)
const dtn::data::Number & getLifetime() const
NeighborDatabase::NeighborEntry & get(const dtn::data::EID &eid, bool noCached=false)
bool hasRequest(const dtn::data::Number &identifier) const
virtual void remove(const dtn::data::BundleID &id)=0
void pushHandshakeUpdated(const NodeHandshakeItem::IDENTIFIER id)
MESSAGE_TYPE getType() const
static const dtn::data::Number identifier
void responseHandshake(const dtn::data::EID &source, const NodeHandshake &request, NodeHandshake &answer)
void setLimit(const Number &hops)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
const request_set & getRequests() const
void doHandshake(const dtn::data::EID &eid)
const dtn::data::BundleSet & getVector() const
std::string getString() const
void raiseEvent(const dtn::core::NodeEvent &evt)
const dtn::data::BundleSet & getVector() const
iterator find(block_t blocktype)
static void raise(const dtn::data::MetaBundle &bundle, EventBundleAction action, dtn::data::StatusReportBlock::REASON_CODE reason=dtn::data::StatusReportBlock::NO_ADDITIONAL_INFORMATION)
ibrcommon::BLOB::Reference getBLOB() const
void processHandshake(const dtn::data::EID &source, NodeHandshake &answer)
virtual ~NodeHandshakeExtension()
const dtn::data::EID & getEID() const
static const dtn::data::Number identifier
std::set< dtn::data::Number > request_set
void addRequest(const dtn::data::Number &identifier)
static dtn::data::Timestamp getMonotonicTimestamp()
const ibrcommon::BloomFilter & getBloomFilter() const
void set(FLAGS flag, bool value)
static BundleCore & getInstance()