26 #include <ibrcommon/Logger.h>
32 ibrcommon::Mutex SQLiteBundleSet::Factory::_create_lock;
55 return new SQLiteBundleSet(create(_sqldb, name),
true, listener, bf_size, _sqldb);
63 ibrcommon::MutexLock l(_create_lock);
68 }
while (__exists(db, name,
false));
70 return __create(db, name,
false);
75 ibrcommon::MutexLock l(_create_lock);
76 return __create(db, name,
true);
83 sqlite3_bind_text(*st1, 1, name.c_str(),
static_cast<int>(name.length()), SQLITE_TRANSIENT);
84 sqlite3_bind_int(*st1, 2, persistent ? 1 : 0);
89 sqlite3_bind_text(*st2, 1, name.c_str(),
static_cast<int>(name.length()), SQLITE_TRANSIENT);
90 sqlite3_bind_int(*st2, 2, persistent ? 1 : 0);
92 if (st2.
step() == SQLITE_ROW) {
93 return sqlite3_column_int64(*st2, 0);
102 sqlite3_bind_text(*st, 1, name.c_str(),
static_cast<int>(name.length()), SQLITE_TRANSIENT);
103 sqlite3_bind_int(*st, 2, persistent ? 1 : 0);
105 return ( st.
step() == SQLITE_ROW);
109 : _set_id(id), _bf_size(bf_size), _bf(bf_size * 8), _listener(listener), _consistent(true),_sqldb(database), _persistent(persistant)
114 rebuild_bloom_filter();
118 SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_EXPIRE_NEXT_TIMESTAMP]);
119 sqlite3_bind_int64(*st, 1, _set_id);
123 if (err == SQLITE_ROW)
125 _next_expiration = sqlite3_column_int64(*st, 0);
136 if (!_persistent) destroy();
139 void SQLiteBundleSet::destroy()
145 sqlite3_bind_int64(*st, 1, _set_id);
148 }
catch (
const SQLiteDatabase::SQLiteQueryException&) {
159 set->_bf_size = _bf_size;
161 set->_consistent = _consistent;
162 set->_next_expiration = _next_expiration;
167 sqlite3_bind_int64(*st, 1, set->_set_id);
168 sqlite3_bind_int64(*st, 2, _set_id);
175 return refcnt_ptr<dtn::data::BundleSetImpl>(set);
188 _bf_size = set._bf_size;
190 _consistent = set._consistent;
191 _next_expiration = set._next_expiration;
196 sqlite3_bind_int64(*st, 1, _set_id);
197 sqlite3_bind_int64(*st, 2, set._set_id);
203 }
catch (
const std::bad_cast&) {
214 sqlite3_bind_int64(*st, 1, _set_id);
215 sqlite3_bind_text(*st, 2, bundle.source.getString().c_str(),
static_cast<int>(bundle.source.getString().length()), SQLITE_TRANSIENT);
216 sqlite3_bind_int64(*st, 3, bundle.timestamp.get<uint64_t>());
217 sqlite3_bind_int64(*st, 4, bundle.sequencenumber.get<uint64_t>());
219 if (bundle.isFragment()) {
220 sqlite3_bind_int64(*st, 5, bundle.fragmentoffset.get<uint64_t>());
221 sqlite3_bind_int64(*st, 6, bundle.getPayloadLength());
223 sqlite3_bind_int64(*st, 5, -1);
224 sqlite3_bind_int64(*st, 6, -1);
227 sqlite3_bind_int64(*st, 7, bundle.expiretime.get<uint64_t>());
232 new_expire_time(bundle.expiretime);
245 sqlite3_bind_int64(*st, 1, _set_id);
259 if (!
id.isIn(_bf))
return false;
263 if (!_consistent)
return true;
266 SQLiteDatabase::Statement st( const_cast<sqlite3*>(_sqldb._database), SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_GET]);
268 sqlite3_bind_int64(*st, 1, _set_id);
269 sqlite3_bind_text(*st, 2,
id.source.getString().c_str(),
static_cast<int>(
id.source.getString().length()), SQLITE_TRANSIENT);
270 sqlite3_bind_int64(*st, 3,
id.timestamp.get<uint64_t>());
271 sqlite3_bind_int64(*st, 4,
id.sequencenumber.get<uint64_t>());
273 if (
id.isFragment()) {
274 sqlite3_bind_int64(*st, 5,
id.fragmentoffset.get<uint64_t>());
275 sqlite3_bind_int64(*st, 6,
id.getPayloadLength());
277 sqlite3_bind_int64(*st, 5, -1);
278 sqlite3_bind_int64(*st, 6, -1);
281 if (st.
step() == SQLITE_ROW)
293 if (timestamp == 0)
return;
296 if (_next_expiration > timestamp)
return;
299 if (_listener != NULL) {
302 sqlite3_bind_int64(*st, 1, _set_id);
305 sqlite3_bind_int64(*st, 2, timestamp.get<uint64_t>());
307 while (st.
step() == SQLITE_ROW)
310 get_bundleid(st,
id);
315 _listener->eventBundleExpired(bundle);
318 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
325 sqlite3_bind_int64(*st, 1, _set_id);
328 sqlite3_bind_int64(*st, 2, timestamp.get<uint64_t>());
332 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
336 rebuild_bloom_filter();
345 sqlite3_bind_int64(*st, 1, _set_id);
347 if (st.
step() == SQLITE_ROW)
349 rows = sqlite3_column_int(*st, 0);
370 std::set<dtn::data::MetaBundle> ret;
374 sqlite3_bind_int64(*st, 1, _set_id);
376 std::set<dtn::data::MetaBundle> ret;
379 while (st.
step() == SQLITE_ROW)
382 get_bundleid(st,
id);
384 if ( !
id.isIn(filter) )
387 ret.insert( bundle );
402 const char *data =
reinterpret_cast<const char*
>(_bf.table());
403 stream.write(data, _bf.size());
413 std::vector<char> buffer(count.get<
size_t>());
415 stream.read(&buffer[0], buffer.size());
418 _bf.load((
unsigned char*)&buffer[0], buffer.size());
428 if (_next_expiration == 0 || ttl < _next_expiration)
430 _next_expiration = ttl;
434 void SQLiteBundleSet::rebuild_bloom_filter()
440 SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_GET_ALL]);
441 sqlite3_bind_int64(*st, 1, _set_id);
443 std::set<dtn::data::MetaBundle> ret;
446 while (st.step() == SQLITE_ROW)
449 get_bundleid(st,
id);
454 }
catch (
const SQLiteDatabase::SQLiteQueryException&) {
459 void SQLiteBundleSet::get_bundleid(SQLiteDatabase::Statement &st,
dtn::data::BundleID &
id,
int offset)
const throw (SQLiteDatabase::SQLiteQueryException)
461 id.source =
dtn::data::EID((
const char*)sqlite3_column_text(*st, offset + 0));
462 id.timestamp = sqlite3_column_int64(*st, offset + 1);
463 id.sequencenumber = sqlite3_column_int64(*st, offset + 2);
465 id.setFragment(sqlite3_column_int64(*st, offset + 2) >= 0);
467 if (
id.isFragment()) {
468 id.fragmentoffset = sqlite3_column_int64(*st, offset + 3);
469 id.setPayloadLength(sqlite3_column_int64(*st, offset + 4));
471 id.fragmentoffset = 0;
472 id.setPayloadLength(0);
dtn::data::BundleSetImpl * create(dtn::data::BundleSet::Listener *listener, dtn::data::Size bf_size)
Factory(SQLiteDatabase &db)
virtual std::ostream & serialize(std::ostream &stream) const
virtual dtn::data::Size size() const
static bool __exists(SQLiteDatabase &db, const std::string &name, bool persistent)
virtual refcnt_ptr< BundleSetImpl > copy() const
virtual std::istream & deserialize(std::istream &stream)
dtn::data::Length getLength() const
virtual void expire(const dtn::data::Timestamp timestamp)
const ibrcommon::BloomFilter & getBloomFilter() const
SQLiteBundleSet(const size_t id, bool persistant, dtn::data::BundleSet::Listener *listener, dtn::data::Size bf_size, dtn::storage::SQLiteDatabase &database)
virtual void assign(const refcnt_ptr< BundleSetImpl > &)
std::set< dtn::data::MetaBundle > getNotIn(const ibrcommon::BloomFilter &filter) const
virtual void add(const dtn::data::MetaBundle &bundle)
virtual bool has(const dtn::data::BundleID &bundle) const
static const std::string gen_chars(const size_t &length)
virtual ~SQLiteBundleSet()
dtn::data::SDNV< Size > Number
static size_t __create(SQLiteDatabase &db, const std::string &name, bool persistent)