41 #include <ibrcommon/Logger.h>
50 const std::string Registration::TAG =
"Registration";
51 ibrcommon::Mutex Registration::_handle_lock;
52 std::set<std::string> Registration::_handles;
54 const std::string Registration::gen_handle()
62 uint32_t *int_handle = (uint32_t*)new_handle.c_str();
65 new_handle = ss.str();
71 const std::string& Registration::alloc_handle(
const std::string &handle)
73 ibrcommon::MutexLock l(_handle_lock);
74 std::pair<std::set<std::string>::iterator,
bool> ret = _handles.insert(handle);
77 ret = _handles.insert(gen_handle());
83 const std::string& Registration::alloc_handle()
85 return alloc_handle(gen_handle());
88 void Registration::free_handle(
const std::string &handle)
90 ibrcommon::MutexLock l(_handle_lock);
91 _handles.erase(handle);
95 : _handle(alloc_handle(handle)),
96 _default_eid(core::
BundleCore::local), _no_more_bundles(false),
97 _persistent(false), _detached(false), _expiry(0), _filter_fragments(true)
103 : _handle(alloc_handle()),
104 _default_eid(core::
BundleCore::local), _no_more_bundles(false),
105 _persistent(false), _detached(false), _expiry(0), _filter_fragments(true)
112 free_handle(_handle);
117 ibrcommon::MutexLock l(_wait_for_cond);
120 _no_more_bundles =
false;
121 _wait_for_cond.signal(
true);
125 _notify_queue.push(call);
131 ibrcommon::MutexLock l(_wait_for_cond);
133 while (_no_more_bundles)
137 _wait_for_cond.wait(timeout);
141 _wait_for_cond.wait();
148 return _notify_queue.poll();
153 ibrcommon::MutexLock l(_endpoints_lock);
154 return (_endpoints.find(endpoint) != _endpoints.end());
159 ibrcommon::MutexLock l(_endpoints_lock);
183 return storage.
get(b);
188 ibrcommon::MutexLock l(_receive_lock);
196 }
catch (
const ibrcommon::QueueUnblockedException &e) {
197 if (e.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT)
199 IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 25) <<
"search for more bundles" << IBRCOMMON_LOGGER_ENDL;
229 BundleFilter(
const std::set<dtn::data::EID> endpoints,
const RegistrationQueue &queue,
bool loopback,
bool fragment_filter)
230 : _endpoints(endpoints), _queue(queue), _loopback(loopback), _fragment_filter(fragment_filter)
245 if (_endpoints.find(meta.
destination) == _endpoints.end())
253 if (_endpoints.find(meta.
source) != _endpoints.end())
259 IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 30) <<
"search bundle in the list of delivered bundles: " << meta.
toString() << IBRCOMMON_LOGGER_ENDL;
261 if (_queue.has(meta))
270 const std::string getWhere()
const throw ()
272 if (_endpoints.size() > 1)
274 std::string where =
"(";
276 for (
size_t i = _endpoints.size() - 1; i > 0; i--)
278 where +=
"destination = ? OR ";
281 return where +
"destination = ?)";
283 else if (_endpoints.size() == 1)
285 return "destination = ?";
289 return "destination = null";
293 int bind(sqlite3_stmt *st,
int offset)
const throw ()
297 for (std::set<dtn::data::EID>::const_iterator iter = _endpoints.begin(); iter != _endpoints.end(); ++iter)
299 const std::string data = (*iter).getString();
301 sqlite3_bind_text(st, o, data.c_str(),
static_cast<int>(data.size()), SQLITE_TRANSIENT);
310 const std::set<dtn::data::EID> _endpoints;
311 const RegistrationQueue &_queue;
312 const bool _loopback;
313 const bool _fragment_filter;
314 } filter(_endpoints, _queue,
false, fragment_conf && _filter_fragments);
317 ibrcommon::MutexLock l(_endpoints_lock);
322 _no_more_bundles =
true;
327 Registration::RegistrationQueue::RegistrationQueue()
331 Registration::RegistrationQueue::~RegistrationQueue()
340 ibrcommon::MutexLock l(_lock);
341 _recv_bundles.add(bundle);
343 IBRCOMMON_LOGGER_DEBUG_TAG(Registration::TAG, 10) <<
"[RegistrationQueue] add bundle to list of delivered bundles: " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
344 }
catch (
const ibrcommon::Exception&) { }
347 dtn::data::MetaBundle Registration::RegistrationQueue::pop() throw (const ibrcommon::QueueUnblockedException)
349 return _queue.take();
352 bool Registration::RegistrationQueue::has(
const dtn::data::BundleID &bundle)
const throw ()
354 ibrcommon::MutexLock l(const_cast<ibrcommon::Mutex&>(_lock));
355 return _recv_bundles.has(bundle);
360 ibrcommon::MutexLock l(_lock);
361 _recv_bundles.expire(timestamp);
364 void Registration::RegistrationQueue::abort() throw ()
369 void Registration::RegistrationQueue::reset() throw ()
377 ibrcommon::MutexLock l(_endpoints_lock);
380 _endpoints.insert(endpoint);
389 ibrcommon::MutexLock l(_endpoints_lock);
390 _endpoints.erase(endpoint);
398 return (_handle == other);
406 return (_handle == other._handle);
414 return (_handle < other._handle);
420 _notify_queue.abort();
422 ibrcommon::MutexLock l(_wait_for_cond);
423 _wait_for_cond.abort();
438 _expiry = lifetime + ibrcommon::Timer::get_current_time();
449 if(_expiry <= ibrcommon::Timer::get_current_time())
459 if(_expiry <= ibrcommon::Timer::get_current_time())
469 _filter_fragments = val;
482 ibrcommon::MutexLock l(_attach_lock);
490 ibrcommon::MutexLock l1(_wait_for_cond);
491 ibrcommon::MutexLock l2(_attach_lock);
496 _notify_queue.reset();
498 _wait_for_cond.reset();
static Configuration & getInstance(bool reset=false)
std::string toString() const
static void inject(const dtn::data::EID &source, dtn::data::Bundle &bundle)
void delivered(const dtn::data::MetaBundle &m) const
void setPersistent(ibrcommon::Timer::time_t lifetime)
static dtn::data::EID local
void subscribe(const dtn::data::EID &endpoint)
void setApplication(const dtn::data::Number &app)
dtn::data::Timestamp timestamp
const std::string & getHandle() const
bool hasSubscribed(const dtn::data::EID &endpoint)
virtual void get(const BundleSelector &cb, BundleResult &result)=0
dtn::data::MetaBundle receiveMetaBundle()
const dtn::data::EID & getDefaultEID() const
dtn::storage::BundleSeeker & getSeeker()
const Configuration::Network & getNetwork() const
void notify(const NOTIFY_CALL)
static dtn::data::Size max_bundles_in_transit
void unsubscribe(const dtn::data::EID &endpoint)
void setFilterFragments(bool val)
ibrcommon::Timer::time_t getExpireTime() const
dtn::data::Bundle receive()
bool operator<(const Registration &) const
static dtn::data::Timestamp getTime()
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
const std::set< dtn::data::EID > getSubscriptions()
static const std::string gen_chars(const size_t &length)
void relabel(bool zero_timestamp=false)
void wait_for_bundle(size_t timeout=0)
iterator find(block_t blocktype)
static void raise(const dtn::data::MetaBundle &bundle, EventBundleAction action, dtn::data::StatusReportBlock::REASON_CODE reason=dtn::data::StatusReportBlock::NO_ADDITIONAL_INFORMATION)
static void raise(const dtn::data::MetaBundle &meta, REASON_CODE reason=DELIVERED)
bool operator==(const std::string &) const
dtn::storage::BundleStorage & getStorage()
bool doFragmentation() const
static void processIncomingBundle(const dtn::data::EID &source, dtn::data::Bundle &bundle)
static BundleCore & getInstance()