IBR-DTN  1.0.0
SQLiteBundleSet.cpp
Go to the documentation of this file.
1 /*
2  * SQLiteBundleSet.cpp
3  *
4  * Copyright (C) 2013 IBR, TU Braunschweig
5  *
6  * Written-by: David Goltzsche <goltzsch@ibr.cs.tu-bs.de>
7  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  * Created on: 19.04.2013
22  */
23 
25 #include <ibrdtn/utils/Random.h>
26 #include <ibrcommon/Logger.h>
27 
28 namespace dtn
29 {
30  namespace storage
31  {
32  ibrcommon::Mutex SQLiteBundleSet::Factory::_create_lock;
33 
35  : _sqldb(db)
36  {
37  }
38 
40  {
41  }
42 
44  {
45  try {
46  return new SQLiteBundleSet(create(_sqldb), false, listener, bf_size, _sqldb);
47  } catch (const SQLiteDatabase::SQLiteQueryException&) {
48  return NULL;
49  }
50  }
51 
53  {
54  try {
55  return new SQLiteBundleSet(create(_sqldb, name), true, listener, bf_size, _sqldb);
56  } catch (const SQLiteDatabase::SQLiteQueryException&) {
57  return NULL;
58  }
59  }
60 
62  {
63  ibrcommon::MutexLock l(_create_lock);
64 
65  std::string name;
66  do {
68  } while (__exists(db, name, false));
69 
70  return __create(db, name, false);
71  }
72 
74  {
75  ibrcommon::MutexLock l(_create_lock);
76  return __create(db, name, true);
77  }
78 
79  size_t SQLiteBundleSet::Factory::__create(SQLiteDatabase &db, const std::string &name, bool persistent) throw (SQLiteDatabase::SQLiteQueryException)
80  {
81  // create a new name (fails, if the name already exists)
82  SQLiteDatabase::Statement st1(db._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_NAME_ADD]);
83  sqlite3_bind_text(*st1, 1, name.c_str(), static_cast<int>(name.length()), SQLITE_TRANSIENT);
84  sqlite3_bind_int(*st1, 2, persistent ? 1 : 0);
85  st1.step();
86 
87  // get the ID of the name
88  SQLiteDatabase::Statement st2(db._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_NAME_GET_ID]);
89  sqlite3_bind_text(*st2, 1, name.c_str(), static_cast<int>(name.length()), SQLITE_TRANSIENT);
90  sqlite3_bind_int(*st2, 2, persistent ? 1 : 0);
91 
92  if (st2.step() == SQLITE_ROW) {
93  return sqlite3_column_int64(*st2, 0);
94  }
95 
96  throw SQLiteDatabase::SQLiteQueryException("could not create the bundle-set name");
97  }
98 
99  bool SQLiteBundleSet::Factory::__exists(SQLiteDatabase &db, const std::string &name, bool persistent) throw (SQLiteDatabase::SQLiteQueryException)
100  {
101  SQLiteDatabase::Statement st(db._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_NAME_GET_ID]);
102  sqlite3_bind_text(*st, 1, name.c_str(), static_cast<int>(name.length()), SQLITE_TRANSIENT);
103  sqlite3_bind_int(*st, 2, persistent ? 1 : 0);
104 
105  return ( st.step() == SQLITE_ROW);
106  }
107 
109  : _set_id(id), _bf_size(bf_size), _bf(bf_size * 8), _listener(listener), _consistent(true),_sqldb(database), _persistent(persistant)
110  {
111  // if this is a persitant bundle-set
112  if (_persistent) {
113  // rebuild the bloom filter
114  rebuild_bloom_filter();
115 
116  // load the next expiration from the storage
117  try {
118  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_EXPIRE_NEXT_TIMESTAMP]);
119  sqlite3_bind_int64(*st, 1, _set_id);
120 
121  int err = st.step();
122 
123  if (err == SQLITE_ROW)
124  {
125  _next_expiration = sqlite3_column_int64(*st, 0);
126  }
127  } catch (const SQLiteDatabase::SQLiteQueryException&) {
128  // error
129  }
130  }
131  }
132 
134  {
135  // clear on deletion if this set is not persistent
136  if (!_persistent) destroy();
137  }
138 
139  void SQLiteBundleSet::destroy()
140  {
141  clear();
142 
143  try {
144  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_NAME_REMOVE]);
145  sqlite3_bind_int64(*st, 1, _set_id);
146 
147  st.step();
148  } catch (const SQLiteDatabase::SQLiteQueryException&) {
149  // error
150  }
151  }
152 
153  refcnt_ptr<dtn::data::BundleSetImpl> SQLiteBundleSet::copy() const
154  {
155  // create a new bundle-set
156  SQLiteBundleSet *set = new SQLiteBundleSet(Factory::create(_sqldb), false, NULL, _bf_size, _sqldb);
157 
158  // copy Bloom-filter
159  set->_bf_size = _bf_size;
160  set->_bf = _bf;
161  set->_consistent = _consistent;
162  set->_next_expiration = _next_expiration;
163 
164  // copy all entries
165  try {
166  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_COPY]);
167  sqlite3_bind_int64(*st, 1, set->_set_id); // destination
168  sqlite3_bind_int64(*st, 2, _set_id); // source
169 
170  st.step();
171  } catch (const SQLiteDatabase::SQLiteQueryException&) {
172  // error
173  }
174 
175  return refcnt_ptr<dtn::data::BundleSetImpl>(set);
176  }
177 
178  void SQLiteBundleSet::assign(const refcnt_ptr<BundleSetImpl> &other)
179  {
180  // clear all bundles first
181  clear();
182 
183  // cast the given set to a MemoryBundleSet
184  try {
185  const SQLiteBundleSet &set = dynamic_cast<const SQLiteBundleSet&>(*other);
186 
187  // copy Bloom-filter
188  _bf_size = set._bf_size;
189  _bf = set._bf;
190  _consistent = set._consistent;
191  _next_expiration = set._next_expiration;
192 
193  // copy all entries
194  try {
195  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_COPY]);
196  sqlite3_bind_int64(*st, 1, _set_id); // destination
197  sqlite3_bind_int64(*st, 2, set._set_id); // source
198 
199  st.step();
200  } catch (const SQLiteDatabase::SQLiteQueryException&) {
201  // error
202  }
203  } catch (const std::bad_cast&) {
204  // incompatible bundle-set implementation - abort here
205  }
206  }
207 
208  void SQLiteBundleSet::add(const dtn::data::MetaBundle &bundle) throw ()
209  {
210  try {
211  // insert bundle id into database
212  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_ADD]);
213 
214  sqlite3_bind_int64(*st, 1, _set_id);
215  sqlite3_bind_text(*st, 2, bundle.source.getString().c_str(), static_cast<int>(bundle.source.getString().length()), SQLITE_TRANSIENT);
216  sqlite3_bind_int64(*st, 3, bundle.timestamp.get<uint64_t>());
217  sqlite3_bind_int64(*st, 4, bundle.sequencenumber.get<uint64_t>());
218 
219  if (bundle.isFragment()) {
220  sqlite3_bind_int64(*st, 5, bundle.fragmentoffset.get<uint64_t>());
221  sqlite3_bind_int64(*st, 6, bundle.getPayloadLength());
222  } else {
223  sqlite3_bind_int64(*st, 5, -1);
224  sqlite3_bind_int64(*st, 6, -1);
225  }
226 
227  sqlite3_bind_int64(*st, 7, bundle.expiretime.get<uint64_t>());
228 
229  st.step();
230 
231  // update expiretime, if necessary
232  new_expire_time(bundle.expiretime);
233 
234  // add bundle to the bloomfilter
235  bundle.addTo(_bf);
236  } catch (const SQLiteDatabase::SQLiteQueryException&) {
237  // error
238  }
239  }
240 
241  void SQLiteBundleSet::clear() throw ()
242  {
243  try {
244  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_CLEAR]);
245  sqlite3_bind_int64(*st, 1, _set_id);
246 
247  st.step();
248  } catch (const SQLiteDatabase::SQLiteQueryException&) {
249  // error
250  }
251 
252  // clear the bloom-filter
253  _bf.clear();
254  }
255 
256  bool SQLiteBundleSet::has(const dtn::data::BundleID &id) const throw ()
257  {
258  // check bloom-filter first
259  if (!id.isIn(_bf)) return false;
260 
261  // Return true if the bloom-filter is not consistent with
262  // the bundles set. This happen if the MemoryBundleSet gets deserialized.
263  if (!_consistent) return true;
264 
265  try {
266  SQLiteDatabase::Statement st( const_cast<sqlite3*>(_sqldb._database), SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_GET]);
267 
268  sqlite3_bind_int64(*st, 1, _set_id);
269  sqlite3_bind_text(*st, 2, id.source.getString().c_str(), static_cast<int>(id.source.getString().length()), SQLITE_TRANSIENT);
270  sqlite3_bind_int64(*st, 3, id.timestamp.get<uint64_t>());
271  sqlite3_bind_int64(*st, 4, id.sequencenumber.get<uint64_t>());
272 
273  if (id.isFragment()) {
274  sqlite3_bind_int64(*st, 5, id.fragmentoffset.get<uint64_t>());
275  sqlite3_bind_int64(*st, 6, id.getPayloadLength());
276  } else {
277  sqlite3_bind_int64(*st, 5, -1);
278  sqlite3_bind_int64(*st, 6, -1);
279  }
280 
281  if (st.step() == SQLITE_ROW)
282  return true;
283  } catch (const SQLiteDatabase::SQLiteQueryException&) {
284  // error
285  }
286 
287  return false;
288  }
289 
290  void SQLiteBundleSet::expire(const dtn::data::Timestamp timestamp) throw ()
291  {
292  // we can not expire bundles if we have no idea of time
293  if (timestamp == 0) return;
294 
295  // do not expire if its not the time
296  if (_next_expiration > timestamp) return;
297 
298  // look for expired bundles and announce them in the listener
299  if (_listener != NULL) {
300  try {
301  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_GET_EXPIRED]);
302  sqlite3_bind_int64(*st, 1, _set_id);
303 
304  // set expiration timestamp
305  sqlite3_bind_int64(*st, 2, timestamp.get<uint64_t>());
306 
307  while (st.step() == SQLITE_ROW)
308  {
310  get_bundleid(st, id);
311 
313 
314  // raise bundle expired event
315  _listener->eventBundleExpired(bundle);
316  }
317  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
318  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
319  }
320  }
321 
322  // delete expired bundles
323  try {
324  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_EXPIRE]);
325  sqlite3_bind_int64(*st, 1, _set_id);
326 
327  // set expiration timestamp
328  sqlite3_bind_int64(*st, 2, timestamp.get<uint64_t>());
329 
330  st.step();
331  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
332  IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
333  }
334 
335  // rebuild the bloom filter
336  rebuild_bloom_filter();
337  }
338 
340  {
341  int rows = 0;
342 
343  try {
344  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_COUNT]);
345  sqlite3_bind_int64(*st, 1, _set_id);
346 
347  if (st.step() == SQLITE_ROW)
348  {
349  rows = sqlite3_column_int(*st, 0);
350  }
351  } catch (const SQLiteDatabase::SQLiteQueryException&) {
352  // error
353  }
354 
355  return rows;
356  }
357 
359  {
360  return dtn::data::Number(_bf.size()).getLength() + _bf.size();
361  }
362 
363  const ibrcommon::BloomFilter& SQLiteBundleSet::getBloomFilter() const throw ()
364  {
365  return _bf;
366  }
367 
368  std::set<dtn::data::MetaBundle> SQLiteBundleSet::getNotIn(const ibrcommon::BloomFilter &filter) const throw ()
369  {
370  std::set<dtn::data::MetaBundle> ret;
371 
372  try {
373  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_GET_ALL]);
374  sqlite3_bind_int64(*st, 1, _set_id);
375 
376  std::set<dtn::data::MetaBundle> ret;
378 
379  while (st.step() == SQLITE_ROW)
380  {
381  // get the bundle id
382  get_bundleid(st, id);
383 
384  if ( ! id.isIn(filter) )
385  {
387  ret.insert( bundle );
388  }
389  }
390  } catch (const SQLiteDatabase::SQLiteQueryException&) {
391  // error
392  }
393 
394  return ret;
395  }
396 
397  std::ostream& SQLiteBundleSet::serialize(std::ostream &stream) const
398  {
399  dtn::data::Number size(_bf.size());
400  stream << size;
401 
402  const char *data = reinterpret_cast<const char*>(_bf.table());
403  stream.write(data, _bf.size());
404 
405  return stream;
406  }
407 
408  std::istream& SQLiteBundleSet::deserialize(std::istream &stream)
409  {
410  dtn::data::Number count;
411  stream >> count;
412 
413  std::vector<char> buffer(count.get<size_t>());
414 
415  stream.read(&buffer[0], buffer.size());
416 
418  _bf.load((unsigned char*)&buffer[0], buffer.size());
419 
420  // set the set to in-consistent mode
421  _consistent = false;
422 
423  return stream;
424  }
425 
426  void SQLiteBundleSet::new_expire_time(const dtn::data::Timestamp &ttl) throw ()
427  {
428  if (_next_expiration == 0 || ttl < _next_expiration)
429  {
430  _next_expiration = ttl;
431  }
432  }
433 
434  void SQLiteBundleSet::rebuild_bloom_filter()
435  {
436  // rebuild the bloom-filter
437  _bf.clear();
438 
439  try {
440  SQLiteDatabase::Statement st(_sqldb._database, SQLiteDatabase::_sql_queries[SQLiteDatabase::BUNDLE_SET_GET_ALL]);
441  sqlite3_bind_int64(*st, 1, _set_id);
442 
443  std::set<dtn::data::MetaBundle> ret;
445 
446  while (st.step() == SQLITE_ROW)
447  {
448  // get the bundle id
449  get_bundleid(st, id);
450  id.addTo(_bf);
451  }
452 
453  _consistent = true;
454  } catch (const SQLiteDatabase::SQLiteQueryException&) {
455  // error
456  }
457  }
458 
459  void SQLiteBundleSet::get_bundleid(SQLiteDatabase::Statement &st, dtn::data::BundleID &id, int offset) const throw (SQLiteDatabase::SQLiteQueryException)
460  {
461  id.source = dtn::data::EID((const char*)sqlite3_column_text(*st, offset + 0));
462  id.timestamp = sqlite3_column_int64(*st, offset + 1);
463  id.sequencenumber = sqlite3_column_int64(*st, offset + 2);
464  dtn::data::Number fragmentoffset = 0;
465  id.setFragment(sqlite3_column_int64(*st, offset + 2) >= 0);
466 
467  if (id.isFragment()) {
468  id.fragmentoffset = sqlite3_column_int64(*st, offset + 3);
469  id.setPayloadLength(sqlite3_column_int64(*st, offset + 4));
470  } else {
471  id.fragmentoffset = 0;
472  id.setPayloadLength(0);
473  }
474  }
475  } /* namespace data */
476 } /* namespace dtn */
dtn::data::BundleSetImpl * create(dtn::data::BundleSet::Listener *listener, dtn::data::Size bf_size)
virtual std::ostream & serialize(std::ostream &stream) const
virtual dtn::data::Size size() const
static bool __exists(SQLiteDatabase &db, const std::string &name, bool persistent)
virtual refcnt_ptr< BundleSetImpl > copy() const
virtual std::istream & deserialize(std::istream &stream)
size_t Length
Definition: Number.h:33
dtn::data::Length getLength() const
virtual void expire(const dtn::data::Timestamp timestamp)
const ibrcommon::BloomFilter & getBloomFilter() const
SQLiteBundleSet(const size_t id, bool persistant, dtn::data::BundleSet::Listener *listener, dtn::data::Size bf_size, dtn::storage::SQLiteDatabase &database)
virtual void assign(const refcnt_ptr< BundleSetImpl > &)
std::set< dtn::data::MetaBundle > getNotIn(const ibrcommon::BloomFilter &filter) const
virtual void add(const dtn::data::MetaBundle &bundle)
virtual bool has(const dtn::data::BundleID &bundle) const
static const std::string gen_chars(const size_t &length)
Definition: Random.cpp:56
size_t Size
Definition: Number.h:34
static MetaBundle create(const dtn::data::BundleID &id)
Definition: MetaBundle.cpp:34
dtn::data::SDNV< Size > Number
Definition: Number.h:38
static size_t __create(SQLiteDatabase &db, const std::string &name, bool persistent)