IBR-DTN  1.0.0
Registration.cpp
Go to the documentation of this file.
1 /*
2  * Registration.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "api/Registration.h"
25 #include "storage/BundleStorage.h"
26 #include "core/BundleCore.h"
27 #include "core/BundleEvent.h"
28 #include "core/BundlePurgeEvent.h"
29 #include "core/FragmentManager.h"
31 
32 #ifdef HAVE_SQLITE
34 #endif
35 
37 #include <ibrdtn/data/AgeBlock.h>
38 
39 #include <ibrdtn/utils/Clock.h>
40 #include <ibrdtn/utils/Random.h>
41 #include <ibrcommon/Logger.h>
42 
43 #include <limits.h>
44 #include <stdint.h>
45 
46 namespace dtn
47 {
48  namespace api
49  {
50  const std::string Registration::TAG = "Registration";
51  ibrcommon::Mutex Registration::_handle_lock;
52  std::set<std::string> Registration::_handles;
53 
54  const std::string Registration::gen_handle()
55  {
56  std::string new_handle = dtn::utils::Random::gen_chars(16);
57 
58  // if the local host is configured with an IPN address
59  if (dtn::core::BundleCore::local.isCompressable())
60  {
61  // .. then use 32-bit numbers only
62  uint32_t *int_handle = (uint32_t*)new_handle.c_str();
63  std::stringstream ss;
64  ss << *int_handle;
65  new_handle = ss.str();
66  }
67 
68  return new_handle;
69  }
70 
71  const std::string& Registration::alloc_handle(const std::string &handle)
72  {
73  ibrcommon::MutexLock l(_handle_lock);
74  std::pair<std::set<std::string>::iterator, bool> ret = _handles.insert(handle);
75 
76  while (!ret.second) {
77  ret = _handles.insert(gen_handle());
78  }
79 
80  return (*ret.first);
81  }
82 
83  const std::string& Registration::alloc_handle()
84  {
85  return alloc_handle(gen_handle());
86  }
87 
88  void Registration::free_handle(const std::string &handle)
89  {
90  ibrcommon::MutexLock l(_handle_lock);
91  _handles.erase(handle);
92  }
93 
94  Registration::Registration(const std::string &handle)
95  : _handle(alloc_handle(handle)),
96  _default_eid(core::BundleCore::local), _no_more_bundles(false),
97  _persistent(false), _detached(false), _expiry(0), _filter_fragments(true)
98  {
99  _default_eid.setApplication(_handle);
100  }
101 
103  : _handle(alloc_handle()),
104  _default_eid(core::BundleCore::local), _no_more_bundles(false),
105  _persistent(false), _detached(false), _expiry(0), _filter_fragments(true)
106  {
107  _default_eid.setApplication(_handle);
108  }
109 
111  {
112  free_handle(_handle);
113  }
114 
116  {
117  ibrcommon::MutexLock l(_wait_for_cond);
118  if (call == NOTIFY_BUNDLE_AVAILABLE)
119  {
120  _no_more_bundles = false;
121  _wait_for_cond.signal(true);
122  }
123  else
124  {
125  _notify_queue.push(call);
126  }
127  }
128 
129  void Registration::wait_for_bundle(size_t timeout)
130  {
131  ibrcommon::MutexLock l(_wait_for_cond);
132 
133  while (_no_more_bundles)
134  {
135  if (timeout > 0)
136  {
137  _wait_for_cond.wait(timeout);
138  }
139  else
140  {
141  _wait_for_cond.wait();
142  }
143  }
144  }
145 
147  {
148  return _notify_queue.poll();
149  }
150 
152  {
153  ibrcommon::MutexLock l(_endpoints_lock);
154  return (_endpoints.find(endpoint) != _endpoints.end());
155  }
156 
157  const std::set<dtn::data::EID> Registration::getSubscriptions()
158  {
159  ibrcommon::MutexLock l(_endpoints_lock);
160  return _endpoints;
161  }
162 
164  {
165  // raise bundle event
167 
169  {
171  }
172  }
173 
174  dtn::data::Bundle Registration::receive() throw (dtn::storage::NoBundleFoundException)
175  {
176  // get the global storage
178 
179  // get the next bundles as MetaBundle
181 
182  // load the bundle
183  return storage.get(b);
184  }
185 
186  dtn::data::MetaBundle Registration::receiveMetaBundle() throw (dtn::storage::NoBundleFoundException)
187  {
188  ibrcommon::MutexLock l(_receive_lock);
189 
190  while(true)
191  {
192  try {
193  // get the first bundle in the queue
194  dtn::data::MetaBundle b = _queue.pop();
195  return b;
196  } catch (const ibrcommon::QueueUnblockedException &e) {
197  if (e.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT)
198  {
199  IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 25) << "search for more bundles" << IBRCOMMON_LOGGER_ENDL;
200 
201  // query for new bundles
202  underflow();
203  }
204  }
205  catch(const dtn::storage::NoBundleFoundException & ){
206  }
207  }
208 
210  }
211 
213  {
215 
216  // expire outdated bundles in the list
217  _queue.expire(dtn::utils::Clock::getTime());
218 
222 #ifdef HAVE_SQLITE
224 #else
226 #endif
227  {
228  public:
229  BundleFilter(const std::set<dtn::data::EID> endpoints, const RegistrationQueue &queue, bool loopback, bool fragment_filter)
230  : _endpoints(endpoints), _queue(queue), _loopback(loopback), _fragment_filter(fragment_filter)
231  {};
232 
233  virtual ~BundleFilter() {};
234 
235  virtual dtn::data::Size limit() const throw () { return dtn::core::BundleCore::max_bundles_in_transit; };
236 
237  virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const throw (dtn::storage::BundleSelectorException)
238  {
239  // filter fragments if requested
240  if (meta.isFragment() && _fragment_filter)
241  {
242  return false;
243  }
244 
245  if (_endpoints.find(meta.destination) == _endpoints.end())
246  {
247  return false;
248  }
249 
250  // filter own bundles
251  if (!_loopback)
252  {
253  if (_endpoints.find(meta.source) != _endpoints.end())
254  {
255  return false;
256  }
257  }
258 
259  IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 30) << "search bundle in the list of delivered bundles: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
260 
261  if (_queue.has(meta))
262  {
263  return false;
264  }
265 
266  return true;
267  };
268 
269 #ifdef HAVE_SQLITE
270  const std::string getWhere() const throw ()
271  {
272  if (_endpoints.size() > 1)
273  {
274  std::string where = "(";
275 
276  for (size_t i = _endpoints.size() - 1; i > 0; i--)
277  {
278  where += "destination = ? OR ";
279  }
280 
281  return where + "destination = ?)";
282  }
283  else if (_endpoints.size() == 1)
284  {
285  return "destination = ?";
286  }
287  else
288  {
289  return "destination = null";
290  }
291  };
292 
293  int bind(sqlite3_stmt *st, int offset) const throw ()
294  {
295  int o = offset;
296 
297  for (std::set<dtn::data::EID>::const_iterator iter = _endpoints.begin(); iter != _endpoints.end(); ++iter)
298  {
299  const std::string data = (*iter).getString();
300 
301  sqlite3_bind_text(st, o, data.c_str(), static_cast<int>(data.size()), SQLITE_TRANSIENT);
302  o++;
303  }
304 
305  return o;
306  }
307 #endif
308 
309  private:
310  const std::set<dtn::data::EID> _endpoints;
311  const RegistrationQueue &_queue;
312  const bool _loopback;
313  const bool _fragment_filter;
314  } filter(_endpoints, _queue, false, fragment_conf && _filter_fragments);
315 
316  // query the database for more bundles
317  ibrcommon::MutexLock l(_endpoints_lock);
318 
319  try {
320  dtn::core::BundleCore::getInstance().getSeeker().get( filter, _queue );
321  } catch (const dtn::storage::NoBundleFoundException&) {
322  _no_more_bundles = true;
323  throw;
324  }
325  }
326 
327  Registration::RegistrationQueue::RegistrationQueue()
328  {
329  }
330 
331  Registration::RegistrationQueue::~RegistrationQueue()
332  {
333  }
334 
335  void Registration::RegistrationQueue::put(const dtn::data::MetaBundle &bundle) throw ()
336  {
337  try {
338  _queue.push(bundle);
339 
340  ibrcommon::MutexLock l(_lock);
341  _recv_bundles.add(bundle);
342 
343  IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 10) << "[RegistrationQueue] add bundle to list of delivered bundles: " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
344  } catch (const ibrcommon::Exception&) { }
345  }
346 
347  dtn::data::MetaBundle Registration::RegistrationQueue::pop() throw (const ibrcommon::QueueUnblockedException)
348  {
349  return _queue.take();
350  }
351 
352  bool Registration::RegistrationQueue::has(const dtn::data::BundleID &bundle) const throw ()
353  {
354  ibrcommon::MutexLock l(const_cast<ibrcommon::Mutex&>(_lock));
355  return _recv_bundles.has(bundle);
356  }
357 
358  void Registration::RegistrationQueue::expire(const dtn::data::Timestamp &timestamp) throw ()
359  {
360  ibrcommon::MutexLock l(_lock);
361  _recv_bundles.expire(timestamp);
362  }
363 
364  void Registration::RegistrationQueue::abort() throw ()
365  {
366  _queue.abort();
367  }
368 
369  void Registration::RegistrationQueue::reset() throw ()
370  {
371  _queue.reset();
372  }
373 
375  {
376  {
377  ibrcommon::MutexLock l(_endpoints_lock);
378 
379  // add endpoint to the local set
380  _endpoints.insert(endpoint);
381  }
382 
383  // trigger the search for new bundles
385  }
386 
388  {
389  ibrcommon::MutexLock l(_endpoints_lock);
390  _endpoints.erase(endpoint);
391  }
392 
396  bool Registration::operator==(const std::string &other) const
397  {
398  return (_handle == other);
399  }
400 
404  bool Registration::operator==(const Registration &other) const
405  {
406  return (_handle == other._handle);
407  }
408 
412  bool Registration::operator<(const Registration &other) const
413  {
414  return (_handle < other._handle);
415  }
416 
418  {
419  _queue.abort();
420  _notify_queue.abort();
421 
422  ibrcommon::MutexLock l(_wait_for_cond);
423  _wait_for_cond.abort();
424  }
425 
427  {
428  return _default_eid;
429  }
430 
431  const std::string& Registration::getHandle() const
432  {
433  return _handle;
434  }
435 
436  void Registration::setPersistent(ibrcommon::Timer::time_t lifetime)
437  {
438  _expiry = lifetime + ibrcommon::Timer::get_current_time();
439  _persistent = true;
440  }
441 
443  {
444  _persistent = false;
445  }
446 
448  {
449  if(_expiry <= ibrcommon::Timer::get_current_time())
450  {
451  _persistent = false;
452  }
453 
454  return _persistent;
455  }
456 
458  {
459  if(_expiry <= ibrcommon::Timer::get_current_time())
460  {
461  return false;
462  }
463 
464  return _persistent;
465  }
466 
468  {
469  _filter_fragments = val;
470  }
471 
472  ibrcommon::Timer::time_t Registration::getExpireTime() const
473  {
474  if(!isPersistent()) throw NotPersistentException("Registration is not persistent.");
475 
476  return _expiry;
477 
478  }
479 
481  {
482  ibrcommon::MutexLock l(_attach_lock);
483  if(!_detached) throw AlreadyAttachedException("Registration is already attached to a client.");
484 
485  _detached = false;
486  }
487 
489  {
490  ibrcommon::MutexLock l1(_wait_for_cond);
491  ibrcommon::MutexLock l2(_attach_lock);
492 
493  _detached = true;
494 
495  _queue.reset();
496  _notify_queue.reset();
497 
498  _wait_for_cond.reset();
499  }
500 
502  {
503  // check address fields for "api:me", this has to be replaced
504  static const dtn::data::EID clienteid("api:me");
505 
506  // create a new sequence number
507  bundle.relabel();
508 
509  // if the relabeling results in a zero timestamp, add an ageblock
510  if (bundle.timestamp == 0)
511  {
512  // check for ageblock
513  try {
514  bundle.find<dtn::data::AgeBlock>();
516  // add a new ageblock
518  }
519  }
520 
521  // set the source address to the sending EID
522  bundle.source = source;
523 
524  if (bundle.destination == clienteid) bundle.destination = source;
525  if (bundle.reportto == clienteid) bundle.reportto = source;
526  if (bundle.custodian == clienteid) bundle.custodian = source;
527 
528  // inject the bundle
529  dtn::core::BundleCore::inject(source, bundle);
530  }
531  }
532 }
static Configuration & getInstance(bool reset=false)
std::string toString() const
Definition: BundleID.cpp:190
static void inject(const dtn::data::EID &source, dtn::data::Bundle &bundle)
Definition: BundleCore.cpp:706
void delivered(const dtn::data::MetaBundle &m) const
void setPersistent(ibrcommon::Timer::time_t lifetime)
static dtn::data::EID local
Definition: BundleCore.h:79
bool get(dtn::data::PrimaryBlock::FLAGS flag) const
Definition: MetaBundle.cpp:160
void subscribe(const dtn::data::EID &endpoint)
T & push_front()
Definition: Bundle.h:161
void setApplication(const dtn::data::Number &app)
Definition: EID.cpp:403
dtn::data::Timestamp timestamp
Definition: BundleID.h:54
const std::string & getHandle() const
bool hasSubscribed(const dtn::data::EID &endpoint)
virtual void get(const BundleSelector &cb, BundleResult &result)=0
dtn::data::MetaBundle receiveMetaBundle()
const dtn::data::EID & getDefaultEID() const
dtn::storage::BundleSeeker & getSeeker()
Definition: BundleCore.cpp:246
const Configuration::Network & getNetwork() const
void notify(const NOTIFY_CALL)
static dtn::data::Size max_bundles_in_transit
Definition: BundleCore.h:179
void unsubscribe(const dtn::data::EID &endpoint)
void setFilterFragments(bool val)
ibrcommon::Timer::time_t getExpireTime() const
dtn::data::Bundle receive()
bool operator<(const Registration &) const
bool isFragment() const
Definition: MetaBundle.cpp:165
static dtn::data::Timestamp getTime()
Definition: Clock.cpp:167
dtn::data::EID destination
Definition: MetaBundle.h:60
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
const std::set< dtn::data::EID > getSubscriptions()
static const std::string gen_chars(const size_t &length)
Definition: Random.cpp:56
size_t Size
Definition: Number.h:34
void relabel(bool zero_timestamp=false)
void wait_for_bundle(size_t timeout=0)
iterator find(block_t blocktype)
Definition: Bundle.cpp:307
static void raise(const dtn::data::MetaBundle &bundle, EventBundleAction action, dtn::data::StatusReportBlock::REASON_CODE reason=dtn::data::StatusReportBlock::NO_ADDITIONAL_INFORMATION)
Definition: BundleEvent.cpp:78
static void raise(const dtn::data::MetaBundle &meta, REASON_CODE reason=DELIVERED)
bool operator==(const std::string &) const
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
static void processIncomingBundle(const dtn::data::EID &source, dtn::data::Bundle &bundle)
dtn::data::EID source
Definition: BundleID.h:53
static BundleCore & getInstance()
Definition: BundleCore.cpp:82