IBR-DTN  1.0.0
EpidemicRoutingExtension.cpp
Go to the documentation of this file.
1 /*
2  * EpidemicRoutingExtension.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 
27 #include "core/NodeEvent.h"
28 #include "core/Node.h"
29 #include "net/ConnectionEvent.h"
30 #include "net/ConnectionManager.h"
31 #include "Configuration.h"
32 #include "core/BundleCore.h"
33 #include "core/EventDispatcher.h"
34 #include "core/BundleEvent.h"
35 
36 #include <ibrdtn/data/MetaBundle.h>
37 #include <ibrcommon/thread/MutexLock.h>
38 #include <ibrcommon/Logger.h>
39 
40 #include <functional>
41 #include <list>
42 #include <algorithm>
43 
44 #include <iomanip>
45 #include <ios>
46 #include <iostream>
47 #include <set>
48 #include <memory>
49 
50 #include <stdlib.h>
51 #include <typeinfo>
52 
53 namespace dtn
54 {
55  namespace routing
56  {
57  const std::string EpidemicRoutingExtension::TAG = "EpidemicRoutingExtension";
58 
60  {
61  // write something to the syslog
62  IBRCOMMON_LOGGER_TAG(EpidemicRoutingExtension::TAG, info) << "Initializing epidemic routing module" << IBRCOMMON_LOGGER_ENDL;
63  }
64 
66  {
67  join();
68  }
69 
71  {
73  }
74 
76  {
77  // transfer the next bundle to this destination
78  _taskqueue.push( new SearchNextBundleTask( peer ) );
79  }
80 
82  {
83  // new bundles trigger a recheck for all neighbors
84  const std::set<dtn::core::Node> nl = dtn::core::BundleCore::getInstance().getConnectionManager().getNeighbors();
85 
86  for (std::set<dtn::core::Node>::const_iterator iter = nl.begin(); iter != nl.end(); ++iter)
87  {
88  const dtn::core::Node &n = (*iter);
89 
90  if (n.getEID() != peer)
91  {
92  // trigger all routing modules to search for bundles to forward
93  eventDataChanged(n.getEID());
94  }
95  }
96  }
97 
99  {
100  if (handshake.state == NodeHandshakeEvent::HANDSHAKE_COMPLETED)
101  {
102  // transfer the next bundle to this destination
103  _taskqueue.push( new SearchNextBundleTask( handshake.peer ) );
104  }
105  }
106 
108  {
110 
111  // reset the task queue
112  _taskqueue.reset();
113 
114  // routine checked for throw() on 15.02.2013
115  try {
116  // run the thread
117  start();
118  } catch (const ibrcommon::ThreadException &ex) {
119  IBRCOMMON_LOGGER_TAG(EpidemicRoutingExtension::TAG, error) << "componentUp failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
120  }
121  }
122 
124  {
126 
127  try {
128  // stop the thread
129  stop();
130  join();
131  } catch (const ibrcommon::ThreadException &ex) {
132  IBRCOMMON_LOGGER_TAG(EpidemicRoutingExtension::TAG, error) << "componentDown failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
133  }
134  }
135 
136  const std::string EpidemicRoutingExtension::getTag() const throw ()
137  {
138  return "epidemic";
139  }
140 
142  {
143  _taskqueue.abort();
144  }
145 
147  {
149  {
150  public:
151  BundleFilter(const NeighborDatabase::NeighborEntry &entry, const std::set<dtn::core::Node> &neighbors, const dtn::core::FilterContext &context, const dtn::net::ConnectionManager::protocol_list &plist)
152  : _entry(entry), _neighbors(neighbors), _plist(plist), _context(context)
153  {};
154 
155  virtual ~BundleFilter() {};
156 
157  virtual dtn::data::Size limit() const throw () { return _entry.getFreeTransferSlots(); };
158 
159  virtual bool addIfSelected(dtn::storage::BundleResult &result, const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
160  {
161  // check Scope Control Block - do not forward bundles with hop limit == 0
162  if (meta.hopcount == 0)
163  {
164  return false;
165  }
166 
167  // do not forward local bundles
170  )
171  {
172  return false;
173  }
174 
175  // check Scope Control Block - do not forward non-group bundles with hop limit <= 1
177  {
178  return false;
179  }
180 
181  // do not forward bundles addressed to this neighbor,
182  // because this is handled by neighbor routing extension
183  if (_entry.eid == meta.destination.getNode())
184  {
185  return false;
186  }
187 
188  // if this is a singleton bundle ...
190  {
191  const dtn::core::Node n(meta.destination.getNode());
192 
193  // do not forward the bundle if the final destination is available
194  if (_neighbors.find(n) != _neighbors.end())
195  {
196  return false;
197  }
198  }
199 
200  // do not forward bundles already known by the destination
201  // throws BloomfilterNotAvailableException if no filter is available or it is expired
202  try {
203  if (_entry.has(meta, true))
204  {
205  return false;
206  }
209  }
210 
211  // update filter context
212  dtn::core::FilterContext context = _context;
213  context.setMetaBundle(meta);
214 
215  // check bundle filter for each possible path
216  for (dtn::net::ConnectionManager::protocol_list::const_iterator it = _plist.begin(); it != _plist.end(); ++it)
217  {
218  const dtn::core::Node::Protocol &p = (*it);
219 
220  // update context with current protocol
221  context.setProtocol(p);
222 
223  // execute filtering
225 
227  {
228  // put the selected bundle with targeted interface into the result-set
229  static_cast<RoutingResult&>(result).put(meta, p);
230  return true;
231  }
232  }
233 
234  return false;
235  };
236 
237  private:
238  const NeighborDatabase::NeighborEntry &_entry;
239  const std::set<dtn::core::Node> &_neighbors;
241  const dtn::core::FilterContext &_context;
242  };
243 
244  // list for bundles
245  RoutingResult list;
246 
247  // set of known neighbors
248  std::set<dtn::core::Node> neighbors;
249 
250  while (true)
251  {
252  try {
253  Task *t = _taskqueue.poll();
254  std::auto_ptr<Task> killer(t);
255 
256  IBRCOMMON_LOGGER_DEBUG_TAG(EpidemicRoutingExtension::TAG, 50) << "processing task " << t->toString() << IBRCOMMON_LOGGER_ENDL;
257 
258  try {
264  try {
265  SearchNextBundleTask &task = dynamic_cast<SearchNextBundleTask&>(*t);
266 
267  // clear the result list
268  list.clear();
269 
270  // lock the neighbor database while searching for bundles
271  try {
272  NeighborDatabase &db = (**this).getNeighborDB();
273  ibrcommon::MutexLock l(db);
274  NeighborDatabase::NeighborEntry &entry = db.get(task.eid, true);
275 
276  // check if enough transfer slots available (threshold reached)
277  if (!entry.isTransferThresholdReached())
279 
281  // get current neighbor list
283  } else {
284  // "prefer direct" option disabled - clear the list of neighbors
285  neighbors.clear();
286  }
287 
288  // get a list of protocols supported by both, the local BPA and the remote peer
291 
292  // create a filter context
293  dtn::core::FilterContext context;
294  context.setPeer(entry.eid);
295  context.setRouting(*this);
296 
297  // get the bundle filter of the neighbor
298  const BundleFilter filter(entry, neighbors, context, plist);
299 
300  // some debug output
301  IBRCOMMON_LOGGER_DEBUG_TAG(EpidemicRoutingExtension::TAG, 40) << "search some bundles not known by " << task.eid.getString() << IBRCOMMON_LOGGER_ENDL;
302 
303  // query some unknown bundle from the storage
304  (**this).getSeeker().get(filter, list);
305  } catch (const dtn::storage::BundleSelectorException&) {
306  // query a new summary vector from this neighbor
307  (**this).doHandshake(task.eid);
308  }
309 
310  // send the bundles as long as we have resources
311  for (RoutingResult::const_iterator iter = list.begin(); iter != list.end(); ++iter)
312  {
313  try {
314  // transfer the bundle to the neighbor
315  transferTo(task.eid, (*iter).first, (*iter).second);
316  } catch (const NeighborDatabase::AlreadyInTransitException&) { };
317  }
318  } catch (const NeighborDatabase::NoMoreTransfersAvailable &ex) {
319  IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) << "task " << t->toString() << " aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
320  } catch (const NeighborDatabase::EntryNotFoundException &ex) {
321  IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) << "task " << t->toString() << " aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
322  } catch (const NodeNotAvailableException &ex) {
323  IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) << "task " << t->toString() << " aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
324  } catch (const dtn::storage::NoBundleFoundException &ex) {
325  IBRCOMMON_LOGGER_DEBUG_TAG(TAG, 10) << "task " << t->toString() << " aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
326  } catch (const std::bad_cast&) { };
327  } catch (const ibrcommon::Exception &ex) {
328  IBRCOMMON_LOGGER_DEBUG_TAG(EpidemicRoutingExtension::TAG, 20) << "task failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
329  }
330  } catch (const std::exception &ex) {
331  IBRCOMMON_LOGGER_DEBUG_TAG(EpidemicRoutingExtension::TAG, 15) << "terminated due to " << ex.what() << IBRCOMMON_LOGGER_ENDL;
332  return;
333  }
334 
335  yield();
336  }
337  }
338 
339  /****************************************/
340 
341  EpidemicRoutingExtension::SearchNextBundleTask::SearchNextBundleTask(const dtn::data::EID &e)
342  : eid(e)
343  { }
344 
345  EpidemicRoutingExtension::SearchNextBundleTask::~SearchNextBundleTask()
346  { }
347 
348  std::string EpidemicRoutingExtension::SearchNextBundleTask::toString()
349  {
350  return "SearchNextBundleTask: " + eid.getString();
351  }
352  }
353 }
static Configuration & getInstance(bool reset=false)
const std::set< dtn::core::Node > getNeighbors()
static dtn::data::EID local
Definition: BundleCore.h:79
bool get(dtn::data::PrimaryBlock::FLAGS flag) const
Definition: MetaBundle.cpp:160
static void add(EventReceiver< E > *receiver)
std::list< dtn::core::Node::Protocol > protocol_list
static void remove(const EventReceiver< E > *receiver)
NeighborDatabase::NeighborEntry & get(const dtn::data::EID &eid, bool noCached=false)
dtn::net::ConnectionManager & getConnectionManager()
Definition: BundleCore.cpp:260
BundleFilter::ACTION evaluate(BundleFilter::TABLE table, const FilterContext &context) const
Definition: BundleCore.cpp:617
void setProtocol(const dtn::core::Node::Protocol &protocol)
const Configuration::Network & getNetwork() const
void setPeer(const dtn::data::EID &endpoint)
void transferTo(const dtn::data::EID &destination, const dtn::data::MetaBundle &meta, const dtn::core::Node::Protocol)
virtual void requestHandshake(const dtn::data::EID &, NodeHandshake &) const
void setMetaBundle(const dtn::data::MetaBundle &data)
static const dtn::data::Number identifier
Definition: NodeHandshake.h:66
EID getNode() const
Definition: EID.cpp:528
const protocol_set getSupportedProtocols()
dtn::data::EID destination
Definition: MetaBundle.h:60
virtual void eventDataChanged(const dtn::data::EID &peer)
size_t Size
Definition: Number.h:34
void raiseEvent(const dtn::routing::NodeHandshakeEvent &evt)
const dtn::data::EID & getEID() const
Definition: Node.cpp:406
void addRequest(const dtn::data::Number &identifier)
virtual void eventBundleQueued(const dtn::data::EID &peer, const dtn::data::MetaBundle &meta)
void setRouting(const dtn::routing::RoutingExtension &routing)
static BundleCore & getInstance()
Definition: BundleCore.cpp:82
virtual const std::string getTag() const