IBR-DTN  1.0.0
NodeHandshakeExtension.cpp
Go to the documentation of this file.
1 /*
2  * NodeHandshakeExtension.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
25 
26 #include "core/BundleCore.h"
27 #include "core/BundleEvent.h"
28 #include "core/EventDispatcher.h"
29 
30 #include <ibrdtn/data/AgeBlock.h>
32 #include <ibrdtn/utils/Clock.h>
33 
34 #include <ibrcommon/thread/MutexLock.h>
35 #include <ibrcommon/thread/RWLock.h>
36 #include <ibrcommon/Logger.h>
37 
38 namespace dtn
39 {
40  namespace routing
41  {
42  const std::string NodeHandshakeExtension::TAG = "NodeHandshakeExtension";
43  const dtn::data::EID NodeHandshakeExtension::BROADCAST_ENDPOINT("dtn://broadcast.dtn/routing");
44 
46  : _endpoint(*this)
47  {
48  }
49 
51  {
52  }
53 
55  {
57  }
58 
60  {
62  {
63  // add own summary vector to the message
64  const dtn::data::BundleSet vec = (**this).getKnownBundles();
65 
66  // create an item
68 
69  // add it to the handshake
70  answer.addItem(item);
71  }
72 
74  {
75  // add own purge vector to the message
76  const dtn::data::BundleSet vec = (**this).getPurgedBundles();
77 
78  // create an item
80 
81  // add it to the handshake
82  answer.addItem(item);
83  }
84  }
85 
87  {
88  try {
90 
91  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) << "summary vector received from " << source.getString() << IBRCOMMON_LOGGER_ENDL;
92 
93  // get the summary vector (bloomfilter) of this ECM
94  const ibrcommon::BloomFilter &filter = bfsv.getVector().getBloomFilter();
95 
101  NeighborDatabase &db = (**this).getNeighborDB();
102  ibrcommon::MutexLock l(db);
103  db.get(source.getNode()).update(filter, answer.getLifetime());
104  } catch (std::exception&) { };
105 
106  try {
107  const BloomFilterPurgeVector bfpv = answer.get<BloomFilterPurgeVector>();
108 
109  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) << "purge vector received from " << source.getString() << IBRCOMMON_LOGGER_ENDL;
110 
111  // get the purge vector (bloomfilter) of this ECM
112  const ibrcommon::BloomFilter &purge = bfpv.getVector().getBloomFilter();
113 
114  // get a reference to the storage
115  dtn::storage::BundleStorage &storage = (**this).getStorage();
116 
117  // create a bundle filter which selects bundles contained in the received
118  // purge vector but not addressed locally
120  {
121  public:
122  BundleFilter(const ibrcommon::BloomFilter &filter)
123  : _filter(filter)
124  {};
125 
126  virtual ~BundleFilter() {};
127 
128  virtual dtn::data::Size limit() const throw () { return 100; };
129 
130  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
131  {
132  // do not select locally addressed bundles
134  return false;
135 
136  // do not purge non-singleton bundles
138  return false;
139 
140  // select the bundle if it is in the filter
141  return meta.isIn(_filter);
142  };
143 
144  const ibrcommon::BloomFilter &_filter;
145  } bundle_filter(purge);
146 
148 
149  // while we are getting more results from the storage
150  do {
151  // delete all previous results
152  list.clear();
153 
154  // query for more bundles
155  storage.get(bundle_filter, list);
156 
157  for (dtn::storage::BundleResultList::const_iterator iter = list.begin(); iter != list.end(); ++iter)
158  {
159  const dtn::data::MetaBundle &meta = (*iter);
160 
161  // delete bundle from storage
162  storage.remove(meta);
163 
164  // log the purged bundle
165  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 10) << "bundle purged: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
166 
167  // gen a report
168  dtn::core::BundleEvent::raise(meta, dtn::core::BUNDLE_DELETED, StatusReportBlock::NO_ADDITIONAL_INFORMATION);
169 
170  // add this bundle to the own purge vector
171  (**this).setPurged(meta);
172  }
173  } while (!list.empty());
174  } catch (std::exception&) { };
175  }
176 
178  {
179  _endpoint.query(eid);
180  }
181 
183  {
184  _endpoint.push(id);
185  }
186 
188  {
190  }
191 
193  {
195  }
196 
198  {
199  // If a new neighbor comes available, send him a request for the summary vector
200  // If a neighbor went away we can free the stored summary vector
201  const dtn::core::Node &n = nodeevent.getNode();
202 
203  if (nodeevent.getAction() == NODE_UNAVAILABLE)
204  {
205  // remove the item from the blacklist
206  _endpoint.removeFromBlacklist(n.getEID());
207  }
208  }
209 
210  NodeHandshakeExtension::HandshakeEndpoint::HandshakeEndpoint(NodeHandshakeExtension &callback)
211  : _callback(callback)
212  {
213  AbstractWorker::initialize("routing");
214  AbstractWorker::subscribe(BROADCAST_ENDPOINT);
215  }
216 
217  NodeHandshakeExtension::HandshakeEndpoint::~HandshakeEndpoint()
218  {
219  }
220 
221  void NodeHandshakeExtension::HandshakeEndpoint::callbackBundleReceived(const Bundle &b)
222  {
223  // do not process bundles send from this endpoint
225 
226  _callback.processHandshake(b);
227  }
228 
229  void NodeHandshakeExtension::HandshakeEndpoint::send(dtn::data::Bundle &b)
230  {
231  transmit(b);
232  }
233 
234  void NodeHandshakeExtension::HandshakeEndpoint::removeFromBlacklist(const dtn::data::EID &eid)
235  {
236  ibrcommon::MutexLock l(_blacklist_lock);
237  _blacklist.erase(eid);
238  }
239 
240  void NodeHandshakeExtension::HandshakeEndpoint::push(const NodeHandshakeItem::IDENTIFIER id)
241  {
242  // create a new notification
243  NodeHandshake notification(NodeHandshake::HANDSHAKE_NOTIFICATION);
244  notification.addRequest(id);
245 
246  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) << "handshake notification for type: " << id << IBRCOMMON_LOGGER_ENDL;
247 
248  // create a new bundle with a zero timestamp (+age block)
249  dtn::data::Bundle req(true);
250 
251  // set the source of the bundle
252  req.source = getWorkerURI();
253 
254  // set the destination of the bundle
256  req.destination = BROADCAST_ENDPOINT;
257 
258  // limit the lifetime to 60 seconds
259  req.lifetime = 60;
260 
261  // set high priority
264 
265  dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>();
266  ibrcommon::BLOB::Reference ref = p.getBLOB();
267 
268  // serialize the request into the payload
269  {
270  ibrcommon::BLOB::iostream ios = ref.iostream();
271  (*ios) << notification;
272  }
273 
274  // add a schl block
276  schl.setLimit(1);
277 
278  // send the bundle
279  transmit(req);
280  }
281 
282  void NodeHandshakeExtension::HandshakeEndpoint::query(const dtn::data::EID &origin)
283  {
284  {
285  ibrcommon::MutexLock l(_blacklist_lock);
286  // only query once each 60 seconds
287  if (_blacklist[origin] > dtn::utils::Clock::getMonotonicTimestamp()) return;
288  _blacklist[origin] = dtn::utils::Clock::getMonotonicTimestamp() + 60;
289  }
290 
291  // create a new request for the summary vector of the neighbor
292  NodeHandshake request(NodeHandshake::HANDSHAKE_REQUEST);
293 
294 #ifdef IBRDTN_SUPPORT_COMPRESSION
295  // request compressed answer
297 #endif
298 
299  // walk through all extensions to generate a request
300  (*_callback).requestHandshake(origin, request);
301 
302  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) << "handshake query from " << origin.getString() << ": " << request.toString() << IBRCOMMON_LOGGER_ENDL;
303 
304  // create a new bundle with a zero timestamp (+age block)
305  dtn::data::Bundle req(true);
306 
307  // set the source of the bundle
308  req.source = getWorkerURI();
309 
310  // set the destination of the bundle
312  req.destination = origin;
313 
314  // set destination application
315  req.destination.setApplication("routing");
316 
317  // limit the lifetime to 60 seconds
318  req.lifetime = 60;
319 
320  // set high priority
323 
324  dtn::data::PayloadBlock &p = req.push_back<PayloadBlock>();
325  ibrcommon::BLOB::Reference ref = p.getBLOB();
326 
327  // serialize the request into the payload
328  {
329  ibrcommon::BLOB::iostream ios = ref.iostream();
330  (*ios) << request;
331  }
332 
333  // add a schl block
335  schl.setLimit(1);
336 
337  // send the bundle
338  transmit(req);
339  }
340 
342  {
343  // read the ecm
345  ibrcommon::BLOB::Reference ref = p.getBLOB();
346  NodeHandshake handshake;
347 
348  // locked within this region
349  {
350  ibrcommon::BLOB::iostream s = ref.iostream();
351  (*s) >> handshake;
352  }
353 
354  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) << "handshake received from " << bundle.source.getString() << ": " << handshake.toString() << IBRCOMMON_LOGGER_ENDL;
355 
356  // if this is a request answer with an summary vector
357  if (handshake.getType() == NodeHandshake::HANDSHAKE_REQUEST)
358  {
359  // create a new request for the summary vector of the neighbor
361 
362  // lock the extension list during the processing
363  (**this).responseHandshake(bundle.source, handshake, response);
364 
365  IBRCOMMON_LOGGER_DEBUG_TAG(NodeHandshakeExtension::TAG, 15) << "handshake reply to " << bundle.source.getString() << ": " << response.toString() << IBRCOMMON_LOGGER_ENDL;
366 
367  // create a new bundle with a zero timestamp (+age block)
368  dtn::data::Bundle answer(true);
369 
370  // set the source of the bundle
371  answer.source = _endpoint.getWorkerURI();
372 
373  // set the destination of the bundle
375  answer.destination = bundle.source;
376 
377  // limit the lifetime to 60 seconds
378  answer.lifetime = 60;
379 
380  // set high priority
383 
385  ibrcommon::BLOB::Reference ref = p.getBLOB();
386 
387  // serialize the request into the payload
388  {
389  ibrcommon::BLOB::iostream ios = ref.iostream();
390  (*ios) << response;
391  }
392 
393  // add a schl block
395  schl.setLimit(1);
396 
397  // request compression
399 
400  // transfer the bundle to the neighbor
401  _endpoint.send(answer);
402 
403  // call handshake completed event
405  }
406  else if (handshake.getType() == NodeHandshake::HANDSHAKE_RESPONSE)
407  {
408  // walk through all extensions to process the contents of the response
409  (**this).processHandshake(bundle.source, handshake);
410 
411  // call handshake completed event
413  }
414  else if (handshake.getType() == NodeHandshake::HANDSHAKE_NOTIFICATION)
415  {
416  // create a new request
418 
419  // walk through all extensions to find out which requests are of interest
420  (**this).requestHandshake(BROADCAST_ENDPOINT, request);
421 
422  const NodeHandshake::request_set &rs = handshake.getRequests();
423  for (NodeHandshake::request_set::const_iterator it = rs.begin(); it != rs.end(); ++it)
424  {
425  if (request.hasRequest(*it)) {
426  // get node endpoint
427  const dtn::data::EID node = bundle.source.getNode();
428 
429  // clear 60 second blockage
430  _endpoint.removeFromBlacklist(node);
431 
432  // execute handshake
433  _endpoint.query(node);
434  return;
435  }
436  }
437  }
438  }
439  } /* namespace routing */
440 } /* namespace dtn */
std::string toString() const
Definition: BundleID.cpp:190
const std::string toString() const
bool isIn(const ibrcommon::BloomFilter &bf) const
Definition: BundleID.cpp:139
static dtn::data::EID local
Definition: BundleCore.h:79
static void raiseEvent(HANDSHAKE_STATE state, const dtn::data::EID &peer)
bool get(dtn::data::PrimaryBlock::FLAGS flag) const
Definition: MetaBundle.cpp:160
static void add(EventReceiver< E > *receiver)
T & push_front()
Definition: Bundle.h:161
void setApplication(const dtn::data::Number &app)
Definition: EID.cpp:403
void addItem(NodeHandshakeItem *item)
void requestHandshake(const dtn::data::EID &destination, NodeHandshake &request) const
static void remove(const EventReceiver< E > *receiver)
const dtn::data::Number & getLifetime() const
NeighborDatabase::NeighborEntry & get(const dtn::data::EID &eid, bool noCached=false)
bool hasRequest(const dtn::data::Number &identifier) const
virtual void remove(const dtn::data::BundleID &id)=0
void pushHandshakeUpdated(const NodeHandshakeItem::IDENTIFIER id)
MESSAGE_TYPE getType() const
static const dtn::data::Number identifier
Definition: NodeHandshake.h:66
void responseHandshake(const dtn::data::EID &source, const NodeHandshake &request, NodeHandshake &answer)
EID getNode() const
Definition: EID.cpp:528
dtn::data::EID destination
Definition: MetaBundle.h:60
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
const request_set & getRequests() const
void doHandshake(const dtn::data::EID &eid)
const dtn::data::BundleSet & getVector() const
size_t Size
Definition: Number.h:34
std::string getString() const
Definition: EID.cpp:374
void raiseEvent(const dtn::core::NodeEvent &evt)
const dtn::data::BundleSet & getVector() const
T & push_back()
Definition: Bundle.h:180
iterator find(block_t blocktype)
Definition: Bundle.cpp:307
static void raise(const dtn::data::MetaBundle &bundle, EventBundleAction action, dtn::data::StatusReportBlock::REASON_CODE reason=dtn::data::StatusReportBlock::NO_ADDITIONAL_INFORMATION)
Definition: BundleEvent.cpp:78
ibrcommon::BLOB::Reference getBLOB() const
void processHandshake(const dtn::data::EID &source, NodeHandshake &answer)
const dtn::data::EID & getEID() const
Definition: Node.cpp:406
static const dtn::data::Number identifier
Definition: NodeHandshake.h:84
std::set< dtn::data::Number > request_set
void addRequest(const dtn::data::Number &identifier)
static dtn::data::Timestamp getMonotonicTimestamp()
Definition: Clock.cpp:175
const ibrcommon::BloomFilter & getBloomFilter() const
Definition: BundleSet.cpp:120
void set(FLAGS flag, bool value)
dtn::data::EID source
Definition: BundleID.h:53
static BundleCore & getInstance()
Definition: BundleCore.cpp:82