27 #include <ibrcommon/Logger.h>
36 IBRCOMMON_LOGGER_DEBUG_TAG(
"SQLiteDatabase", 50) <<
"sqlite trace: " << pQuery << IBRCOMMON_LOGGER_ENDL;
39 const std::string SQLiteDatabase::TAG =
"SQLiteDatabase";
41 const std::string SQLiteDatabase::_select_names[] = {
42 "source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, expiretime, fragmentoffset, appdatalength, hopcount, netpriority, payloadlength, bytes",
43 "source, timestamp, sequencenumber, fragmentoffset, payloadlength, bytes",
44 "`source`, `timestamp`, `sequencenumber`, `fragmentoffset`, `fragmentlength`, `expiretime`"
47 const std::string SQLiteDatabase::_where_filter[] = {
48 "source = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? AND fragmentlength = ?",
49 "a.source = b.source AND a.timestamp = b.timestamp AND a.sequencenumber = b.sequencenumber AND a.fragmentoffset = b.fragmentoffset AND a.fragmentlength = b.fragmentlength"
52 const std::string SQLiteDatabase::_tables[] =
53 {
"bundles",
"blocks",
"routing",
"routing_bundles",
"routing_nodes",
"properties",
"bundle_set",
"bundle_set_names" };
56 const int SQLiteDatabase::DBSCHEMA_FRESH_VERSION = 8;
58 const int SQLiteDatabase::DBSCHEMA_VERSION = 8;
60 const std::string SQLiteDatabase::QUERY_SCHEMAVERSION =
"SELECT `value` FROM " + SQLiteDatabase::_tables[SQLiteDatabase::SQL_TABLE_PROPERTIES] +
" WHERE `key` = 'version' LIMIT 0,1;";
61 const std::string SQLiteDatabase::SET_SCHEMAVERSION =
"INSERT INTO " + SQLiteDatabase::_tables[SQLiteDatabase::SQL_TABLE_PROPERTIES] +
" (`key`, `value`) VALUES ('version', ?);";
63 const std::string SQLiteDatabase::_sql_queries[SQL_QUERIES_END] =
65 "SELECT " + _select_names[0] +
" FROM " + _tables[SQL_TABLE_BUNDLE],
66 "SELECT " + _select_names[0] +
" FROM " + _tables[SQL_TABLE_BUNDLE] +
" ORDER BY priority DESC, timestamp, sequencenumber, fragmentoffset, fragmentlength LIMIT ?,?;",
67 "SELECT " + _select_names[0] +
" FROM "+ _tables[SQL_TABLE_BUNDLE] +
" WHERE " + _where_filter[0] +
" LIMIT 1;",
68 "SELECT bytes FROM "+ _tables[SQL_TABLE_BUNDLE] +
" WHERE " + _where_filter[0] +
" LIMIT 1;",
69 "SELECT DISTINCT destination FROM " + _tables[SQL_TABLE_BUNDLE],
72 "SELECT " + _select_names[1] +
" FROM "+ _tables[SQL_TABLE_BUNDLE] +
" WHERE expiretime <= ?;",
73 "SELECT filename FROM "+ _tables[SQL_TABLE_BUNDLE] +
" as a, "+ _tables[SQL_TABLE_BLOCK] +
" as b WHERE " + _where_filter[1] +
" AND a.expiretime <= ?;",
74 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +
" WHERE expiretime <= ?;",
75 "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE] +
" ORDER BY expiretime ASC LIMIT 1;",
77 "SELECT ROWID FROM "+ _tables[SQL_TABLE_BUNDLE] +
" LIMIT 1;",
78 "SELECT COUNT(ROWID) FROM "+ _tables[SQL_TABLE_BUNDLE] +
";",
80 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +
" WHERE " + _where_filter[0] +
";",
81 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +
";",
82 "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +
" (source, timestamp, sequencenumber, fragmentoffset, fragmentlength, destination, reportto, custodian, procflags, lifetime, appdatalength, expiretime, priority, hopcount, netpriority, payloadlength, bytes) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);",
83 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +
" SET custodian = ? WHERE " + _where_filter[0] +
";",
85 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +
" SET procflags = ? WHERE " + _where_filter[0] +
";",
88 "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +
" WHERE " + _where_filter[0] +
" ORDER BY ordernumber ASC;",
89 "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +
" WHERE " + _where_filter[0] +
" AND ordernumber = ?;",
90 "DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +
";",
91 "INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +
" (source, timestamp, sequencenumber, fragmentoffset, fragmentlength, blocktype, filename, ordernumber) VALUES (?,?,?,?,?,?,?,?);",
94 "INSERT INTO " + _tables[SQL_TABLE_BUNDLE_SET] +
" (set_id, source, timestamp, sequencenumber, fragmentoffset, fragmentlength, expiretime) VALUES (?,?,?,?,?,?,?);",
95 "DELETE FROM " + _tables[SQL_TABLE_BUNDLE_SET] +
" WHERE set_id = ?;",
96 "SELECT " + _select_names[2] +
" FROM " + _tables[SQL_TABLE_BUNDLE_SET] +
" WHERE set_id = ? AND " + _where_filter[0] +
" LIMIT 1;",
97 "SELECT " + _select_names[2] +
" FROM " + _tables[SQL_TABLE_BUNDLE_SET] +
" WHERE set_id = ? AND expiretime <= ?;",
98 "DELETE FROM " + _tables[SQL_TABLE_BUNDLE_SET] +
" WHERE set_id = ? AND expiretime <= ?;",
99 "SELECT " + _select_names[2] +
" FROM " + _tables[SQL_TABLE_BUNDLE_SET] +
" WHERE set_id = ?;",
100 "SELECT COUNT(*) FROM " + _tables[SQL_TABLE_BUNDLE_SET] +
" WHERE set_id = ?;",
101 "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE_SET] +
" WHERE set_id = ? ORDER BY expiretime ASC LIMIT 1;",
102 "INSERT INTO " + _tables[SQL_TABLE_BUNDLE_SET] +
" SELECT " + _select_names[2] +
", ? FROM " + _tables[SQL_TABLE_BUNDLE_SET] +
" WHERE set_id = ?;",
105 "INSERT INTO " + _tables[SQL_TABLE_BUNDLE_SET_NAME] +
" (name, persistent) VALUES (?, ?);",
106 "SELECT id FROM " + _tables[SQL_TABLE_BUNDLE_SET_NAME] +
" WHERE name = ? AND persistent = ? LIMIT 0, 1;",
107 "DELETE FROM " + _tables[SQL_TABLE_BUNDLE_SET_NAME] +
" WHERE id = ? LIMIT 0, 1;",
112 const std::string SQLiteDatabase::_db_structure[SQLiteDatabase::DB_STRUCTURE_END] =
114 "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BLOCK] +
"` ( `key` INTEGER PRIMARY KEY ASC, `source` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER NOT NULL DEFAULT 0, `fragmentlength` INTEGER NOT NULL DEFAULT 0, `blocktype` INTEGER NOT NULL, `filename` TEXT NOT NULL, `ordernumber` INTEGER NOT NULL);",
115 "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BUNDLE] +
"` ( `key` INTEGER PRIMARY KEY ASC, `source` TEXT NOT NULL, `destination` TEXT NOT NULL, `reportto` TEXT NOT NULL, `custodian` TEXT NOT NULL, `procflags` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `lifetime` INTEGER NOT NULL, `fragmentoffset` INTEGER NOT NULL DEFAULT 0, `appdatalength` INTEGER NOT NULL DEFAULT 0, `fragmentlength` INTEGER NOT NULL DEFAULT 0, `expiretime` INTEGER NOT NULL, `priority` INTEGER NOT NULL, `hopcount` INTEGER DEFAULT NULL, `netpriority` INTEGER NOT NULL DEFAULT 0, `payloadlength` INTEGER NOT NULL DEFAULT 0, `bytes` INTEGER NOT NULL DEFAULT 0);",
116 "CREATE TABLE IF NOT EXISTS "+ _tables[SQL_TABLE_ROUTING] +
" (INTEGER PRIMARY KEY ASC, KEY INT, Routing TEXT);",
117 "CREATE TABLE IF NOT EXISTS "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +
" (INTEGER PRIMARY KEY ASC, BundleID TEXT, KEY INT, Routing TEXT);",
118 "CREATE TABLE IF NOT EXISTS "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +
" (INTEGER PRIMARY KEY ASC, EID text, KEY INT, Routing TEXT);",
119 "CREATE TRIGGER IF NOT EXISTS blocks_autodelete AFTER DELETE ON " + _tables[SQL_TABLE_BUNDLE] +
" FOR EACH ROW BEGIN DELETE FROM " + _tables[SQL_TABLE_BLOCK] +
" WHERE " + _tables[SQL_TABLE_BLOCK] +
".source = OLD.source AND " + _tables[SQL_TABLE_BLOCK] +
".timestamp = OLD.timestamp AND " + _tables[SQL_TABLE_BLOCK] +
".sequencenumber = OLD.sequencenumber AND " + _tables[SQL_TABLE_BLOCK] +
".fragmentoffset = OLD.fragmentoffset AND " + _tables[SQL_TABLE_BLOCK] +
".fragmentlength = OLD.fragmentlength; END;",
120 "CREATE INDEX IF NOT EXISTS blocks_bid ON " + _tables[SQL_TABLE_BLOCK] +
" (source, timestamp, sequencenumber, fragmentoffset, fragmentlength);",
121 "CREATE INDEX IF NOT EXISTS bundles_destination ON " + _tables[SQL_TABLE_BUNDLE] +
" (destination);",
122 "CREATE INDEX IF NOT EXISTS bundles_destination_priority ON " + _tables[SQL_TABLE_BUNDLE] +
" (destination, priority);",
123 "CREATE UNIQUE INDEX IF NOT EXISTS bundles_id ON " + _tables[SQL_TABLE_BUNDLE] +
" (source, timestamp, sequencenumber, fragmentoffset, fragmentlength);"
124 "CREATE INDEX IF NOT EXISTS bundles_expire ON " + _tables[SQL_TABLE_BUNDLE] +
" (source, timestamp, sequencenumber, fragmentoffset, fragmentlength, expiretime);",
125 "CREATE TABLE IF NOT EXISTS '" + _tables[SQL_TABLE_PROPERTIES] +
"' ( `key` TEXT PRIMARY KEY ASC ON CONFLICT REPLACE, `value` TEXT NOT NULL);",
126 "CREATE TABLE IF NOT EXISTS " + _tables[SQL_TABLE_BUNDLE_SET] +
" (`source` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER NOT NULL, `fragmentlength` INTEGER NOT NULL, `expiretime` INTEGER, `set_id` INTEGER, PRIMARY KEY(`set_id`, `source`, `timestamp`, `sequencenumber`, `fragmentoffset`, `fragmentlength`));",
127 "CREATE TABLE IF NOT EXISTS " + _tables[SQL_TABLE_BUNDLE_SET_NAME] +
" (`id` INTEGER PRIMARY KEY, `name` TEXT NOT NULL, `persistent` INTEGER NOT NULL);",
128 "CREATE UNIQUE INDEX IF NOT EXISTS bundle_set_names_index ON " + _tables[SQL_TABLE_BUNDLE_SET_NAME] +
" (`name`, `persistent`);"
139 : _database(database), _st(NULL), _query(query)
147 sqlite3_finalize(_st);
160 sqlite3_clear_bindings(_st);
167 throw SQLiteQueryException(
"statement not prepared");
169 int ret = sqlite3_step(_st);
175 throw SQLiteQueryException(
"Database is corrupt: " + std::string(sqlite3_errmsg(_database)));
177 case SQLITE_INTERRUPT:
178 throw SQLiteQueryException(
"Database interrupt: " + std::string(sqlite3_errmsg(_database)));
181 throw SQLiteQueryException(
"Database schema error: " + std::string(sqlite3_errmsg(_database)));
184 throw SQLiteQueryException(
"Database error: " + std::string(sqlite3_errmsg(_database)));
194 throw SQLiteQueryException(
"already prepared");
196 int err = sqlite3_prepare_v2(_database, _query.c_str(),
static_cast<int>(_query.length()), &_st, 0);
198 if ( err != SQLITE_OK )
199 throw SQLiteQueryException(
"failed to prepare statement: " + _query);
205 : _file(file), _database(NULL), _next_expiration(0), _listener(listener), _faulty(false)
213 int SQLiteDatabase::getVersion() throw (
SQLiteDatabase::SQLiteQueryException)
219 Statement st(_database, QUERY_SCHEMAVERSION);
225 if (err == SQLITE_ROW)
227 std::string dbversion = (
const char*) sqlite3_column_text(*st, 0);
228 std::stringstream ss(dbversion);
235 void SQLiteDatabase::setVersion(
int version)
throw (SQLiteDatabase::SQLiteQueryException)
237 std::stringstream ss; ss << version;
238 Statement st(_database, SET_SCHEMAVERSION);
241 sqlite3_bind_text(*st, 1, ss.str().c_str(),
static_cast<int>(ss.str().length()), SQLITE_TRANSIENT);
244 if(err != SQLITE_DONE)
246 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) <<
"failed to set version " << err << IBRCOMMON_LOGGER_ENDL;
250 void SQLiteDatabase::doUpgrade(
int oldVersion,
int newVersion)
throw (ibrcommon::Exception)
252 if (oldVersion > newVersion)
254 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) <<
"Downgrade from version " << oldVersion <<
" to version " << newVersion <<
" is not possible." << IBRCOMMON_LOGGER_ENDL;
255 throw ibrcommon::Exception(
"Downgrade not possible.");
258 if ((oldVersion != 0) && (oldVersion < DBSCHEMA_FRESH_VERSION))
260 throw ibrcommon::Exception(
"Re-creation required.");
263 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, info) <<
"Upgrade from version " << oldVersion <<
" to version " << newVersion << IBRCOMMON_LOGGER_ENDL;
265 for (
int j = oldVersion; j < newVersion; ++j)
271 for (
size_t i = 0; i < SQL_TABLE_END; ++i)
273 Statement st(_database,
"DROP TABLE IF EXISTS " + _tables[i] +
";");
275 if(err != SQLITE_DONE)
277 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) <<
"drop table " << _tables[i] <<
" failed " << err << IBRCOMMON_LOGGER_ENDL;
282 for (
size_t i = 0; i < (DB_STRUCTURE_END - 1); ++i)
284 Statement st(_database, _db_structure[i]);
286 if(err != SQLITE_DONE)
288 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) <<
"failed to create table " << _tables[i] <<
"; err: " << err << IBRCOMMON_LOGGER_ENDL;
293 setVersion(DBSCHEMA_FRESH_VERSION);
294 j = DBSCHEMA_FRESH_VERSION;
299 if (DBSCHEMA_FRESH_VERSION > j)
300 throw ibrcommon::Exception(
"Re-creation required.");
311 if (sqlite3_threadsafe() == 0)
313 IBRCOMMON_LOGGER_TAG(
"SQLiteDatabase", critical) <<
"sqlite library has not compiled with threading support." << IBRCOMMON_LOGGER_ENDL;
314 throw ibrcommon::Exception(
"need threading support in sqlite!");
318 if (sqlite3_open_v2(_file.getPath().c_str(), &_database, SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL))
320 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) <<
"Can't open database: " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
321 sqlite3_close(_database);
322 throw ibrcommon::Exception(
"Unable to open sqlite database");
326 int version = getVersion();
328 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, info) <<
"Database version " << version <<
" found." << IBRCOMMON_LOGGER_ENDL;
330 if (version != DBSCHEMA_VERSION)
332 doUpgrade(version, DBSCHEMA_VERSION);
334 }
catch (
const ibrcommon::Exception &ex) {
335 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, warning) <<
"upgrade failed, start-over with a fresh database" << IBRCOMMON_LOGGER_ENDL;
336 doUpgrade(0, DBSCHEMA_VERSION);
340 sqlite3_exec(_database,
"PRAGMA synchronous = OFF;", NULL, NULL, NULL);
343 if (IBRCOMMON_LOGGER_LEVEL >= 50)
349 update_expire_time();
355 if (sqlite3_close(_database) != SQLITE_OK)
357 IBRCOMMON_LOGGER_TAG(
"SQLiteDatabase", error) <<
"unable to close database" << IBRCOMMON_LOGGER_ENDL;
367 Statement st(_database, _sql_queries[BUNDLE_GET_ID]);
370 set_bundleid(st,
id);
373 if ((st.
step() != SQLITE_ROW) || _faulty)
376 error <<
"No Bundle found with BundleID: " <<
id.toString();
377 IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteDatabase::TAG, 15) << error.str() << IBRCOMMON_LOGGER_ENDL;
378 throw SQLiteQueryException(error.str());
388 bundle.source =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset) );
389 bundle.destination =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 1) );
390 bundle.reportto =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 2) );
391 bundle.custodian =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 3) );
393 IBRCOMMON_LOGGER_TAG(
TAG, warning) <<
"unable to read EIDs from database" << IBRCOMMON_LOGGER_ENDL;
394 throw SQLiteDatabase::SQLiteQueryException(
"unable to read EIDs from database");
397 bundle.procflags = sqlite3_column_int(*st, offset + 4);
398 bundle.timestamp = sqlite3_column_int64(*st, offset + 5);
399 bundle.sequencenumber = sqlite3_column_int64(*st, offset + 6);
400 bundle.lifetime = sqlite3_column_int64(*st, offset + 7);
401 bundle.expiretime = sqlite3_column_int64(*st, offset + 8);
405 bundle.setFragment(
true);
406 bundle.fragmentoffset = sqlite3_column_int64(*st, offset + 9);
407 bundle.appdatalength = sqlite3_column_int64(*st, offset + 10);
411 bundle.setFragment(
false);
412 bundle.fragmentoffset = 0;
413 bundle.appdatalength = 0;
416 if (sqlite3_column_type(*st, offset + 11) != SQLITE_NULL)
418 bundle.hopcount = sqlite3_column_int64(*st, 11);
426 bundle.net_priority = sqlite3_column_int(*st, 12);
429 bundle.setPayloadLength(sqlite3_column_int64(*st, offset + 13));
435 bundle.source =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 0) );
436 bundle.destination =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 1) );
437 bundle.reportto =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 2) );
438 bundle.custodian =
dtn::data::EID( (
const char*) sqlite3_column_text(*st, offset + 3) );
440 IBRCOMMON_LOGGER_TAG(
TAG, warning) <<
"unable to read EIDs from database" << IBRCOMMON_LOGGER_ENDL;
441 throw SQLiteDatabase::SQLiteQueryException(
"unable to read EIDs from database");
444 bundle.procflags = sqlite3_column_int(*st, offset + 4);
445 bundle.timestamp = sqlite3_column_int64(*st, offset + 5);
446 bundle.sequencenumber = sqlite3_column_int64(*st, offset + 6);
447 bundle.lifetime = sqlite3_column_int64(*st, offset + 7);
452 bundle.fragmentoffset = sqlite3_column_int64(*st, offset + 9);
453 bundle.appdatalength = sqlite3_column_int64(*st, offset + 10);
459 Statement st(_database, _sql_queries[BUNDLE_GET_ITERATOR]);
461 while (st.
step() == SQLITE_ROW)
477 size_t items_added = 0;
479 const std::string base_query =
480 "SELECT " + _select_names[0] +
" FROM " + _tables[SQL_TABLE_BUNDLE];
483 const bool unlimited = (cb.limit() <= 0);
484 const size_t query_limit = 50;
491 const std::string query_string = base_query +
" WHERE " + query.
getWhere() +
" ORDER BY priority DESC, timestamp, sequencenumber, fragmentoffset, fragmentlength LIMIT ?,?;";
496 while (unlimited || (items_added < query_limit))
499 int bind_offset = query.
bind(*st, 1);
502 __get(cb, st, ret, items_added, bind_offset, offset, query_limit);
505 offset += query_limit;
507 }
catch (
const std::bad_cast&) {
508 Statement st(_database, _sql_queries[BUNDLE_GET_FILTER]);
510 while (unlimited || (items_added < query_limit))
513 __get(cb, st, ret, items_added, 1, offset, query_limit);
516 offset += query_limit;
520 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
528 const bool unlimited = (cb.limit() <= 0);
531 sqlite3_bind_int64(*st, bind_offset, offset);
532 sqlite3_bind_int64(*st, bind_offset + 1, query_limit);
535 if ((st.step() == SQLITE_DONE) || _faulty)
539 while (unlimited || (items_added < query_limit))
550 if (cb.addIfSelected(ret, m))
552 IBRCOMMON_LOGGER_DEBUG_TAG(
"SQLiteDatabase", 40) <<
"add bundle to query selection list: " << m.
toString() << IBRCOMMON_LOGGER_ENDL;
559 if (st.step() != SQLITE_ROW)
break;
569 IBRCOMMON_LOGGER_DEBUG_TAG(
"SQLiteDatabase", 25) <<
"get bundle from sqlite storage " <<
id.toString() << IBRCOMMON_LOGGER_ENDL;
572 Statement st(_database, _sql_queries[BUNDLE_GET_ID]);
575 set_bundleid(st,
id);
578 if (((err = st.
step()) != SQLITE_ROW) || _faulty)
580 IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteDatabase::TAG, 15) <<
"sql error: " << err <<
"; No bundle found with id: " <<
id.toString() << IBRCOMMON_LOGGER_ENDL;
592 Statement st(_database, _sql_queries[BLOCK_GET_ID]);
595 set_bundleid(st,
id);
598 while ((err = st.
step()) == SQLITE_ROW)
600 const ibrcommon::File f( (
const char*) sqlite3_column_text(*st, 0) );
601 int blocktyp = sqlite3_column_int(*st, 1);
606 if (err == SQLITE_DONE)
608 if (blocks.size() == 0)
610 IBRCOMMON_LOGGER_TAG(
"SQLiteDatabase", error) <<
"get_blocks: no blocks found for: " <<
id.toString() << IBRCOMMON_LOGGER_ENDL;
611 throw SQLiteQueryException(
"no blocks found");
616 IBRCOMMON_LOGGER_TAG(
"SQLiteDatabase", error) <<
"get_blocks() failure: "<< err <<
" " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
617 throw SQLiteQueryException(
"can not query for blocks");
620 }
catch (
const ibrcommon::Exception &ex) {
621 IBRCOMMON_LOGGER_TAG(
"SQLiteDatabase", error) <<
"could not get bundle blocks: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
630 Statement st(_database, _sql_queries[BUNDLE_STORE]);
632 set_bundleid(st, bundle);
634 sqlite3_bind_text(*st, 6, bundle.destination.getString().c_str(),
static_cast<int>(bundle.destination.getString().length()), SQLITE_TRANSIENT);
635 sqlite3_bind_text(*st, 7, bundle.reportto.getString().c_str(),
static_cast<int>(bundle.reportto.getString().length()), SQLITE_TRANSIENT);
636 sqlite3_bind_text(*st, 8, bundle.custodian.getString().c_str(),
static_cast<int>(bundle.custodian.getString().length()), SQLITE_TRANSIENT);
637 sqlite3_bind_int(*st, 9, bundle.procflags.get<uint32_t>());
638 sqlite3_bind_int64(*st, 10, bundle.lifetime.get<uint64_t>());
642 sqlite3_bind_int64(*st, 11, bundle.appdatalength.get<uint64_t>());
646 sqlite3_bind_int64(*st, 11, -1);
650 sqlite3_bind_int64(*st, 12, expire_time.
get<uint64_t>());
652 sqlite3_bind_int64(*st, 13, bundle.getPriority());
658 sqlite3_bind_null(*st, 14 );
665 sqlite3_bind_int64(*st, 15, 0 );
669 sqlite3_bind_int64(*st, 16, bundle.getPayloadLength());
672 sqlite3_bind_int64(*st, 17, size);
676 if (err == SQLITE_CONSTRAINT)
678 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, warning) <<
"Bundle is already in the storage" << IBRCOMMON_LOGGER_ENDL;
681 error <<
"store() failure: " << err <<
" " << sqlite3_errmsg(_database);
682 throw SQLiteQueryException(error.str());
684 else if ((err != SQLITE_DONE) || _faulty)
687 error <<
"store() failure: " << err <<
" " << sqlite3_errmsg(_database);
688 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << error.str() << IBRCOMMON_LOGGER_ENDL;
690 throw SQLiteQueryException(error.str());
694 new_expire_time(expire_time);
699 int blocktyp = (int)block.getType();
702 Statement st(_database, _sql_queries[BLOCK_STORE]);
705 set_bundleid(st,
id);
708 sqlite3_bind_int(*st, 6, blocktyp);
711 sqlite3_bind_text(*st, 7, file.getPath().c_str(),
static_cast<int>(file.getPath().size()), SQLITE_TRANSIENT);
714 sqlite3_bind_int(*st, 8, index);
717 if (st.
step() != SQLITE_DONE)
719 throw SQLiteQueryException(
"can not store block of bundle");
728 int ret = sqlite3_exec(_database,
"BEGIN TRANSACTION;", NULL, NULL, &zErrMsg);
731 if ( ret != SQLITE_OK )
733 sqlite3_free( zErrMsg );
734 throw SQLiteQueryException( zErrMsg );
743 int ret = sqlite3_exec(_database,
"ROLLBACK TRANSACTION;", NULL, NULL, &zErrMsg);
746 if ( ret != SQLITE_OK )
748 sqlite3_free( zErrMsg );
749 throw SQLiteQueryException( zErrMsg );
758 int ret = sqlite3_exec(_database,
"END TRANSACTION;", NULL, NULL, &zErrMsg);
761 if ( ret != SQLITE_OK )
763 sqlite3_free( zErrMsg );
764 throw SQLiteQueryException( zErrMsg );
775 Statement st(_database, _sql_queries[BUNDLE_GET_LENGTH_ID]);
778 set_bundleid(st,
id);
781 if (st.
step() != SQLITE_ROW)
788 ret = sqlite3_column_int(*st, 0);
794 Statement st(_database, _sql_queries[BLOCK_GET_ID]);
797 set_bundleid(st,
id);
800 while (st.
step() == SQLITE_ROW)
803 ibrcommon::File blockfile( (
const char*)sqlite3_column_text(*st, 0) );
810 Statement st(_database, _sql_queries[BUNDLE_DELETE]);
813 set_bundleid(st,
id);
816 IBRCOMMON_LOGGER_DEBUG_TAG(
"SQLiteDatabase", 10) <<
"bundle " <<
id.toString() <<
" deleted" << IBRCOMMON_LOGGER_ENDL;
820 update_expire_time();
829 Statement bundle_clear(_database, _sql_queries[BUNDLE_CLEAR]);
830 Statement block_clear(_database, _sql_queries[BLOCK_CLEAR]);
835 if (SQLITE_DONE != vacuum.
step())
837 throw SQLiteQueryException(
"SQLiteBundleStore: clear(): vacuum failed.");
846 Statement st(_database, _sql_queries[BUNDLE_GET_ID]);
849 set_bundleid(st,
id);
852 return !((st.
step() != SQLITE_ROW) || _faulty);
857 Statement st(_database, _sql_queries[EMPTY_CHECK]);
859 if (SQLITE_DONE == st.
step())
874 Statement st(_database, _sql_queries[COUNT_ENTRIES]);
876 if ((err = st.
step()) == SQLITE_ROW)
878 rows = sqlite3_column_int(*st, 0);
883 error <<
"count: failure " << err <<
" " << sqlite3_errmsg(_database);
884 throw SQLiteQueryException(error.str());
892 std::set<dtn::data::EID> ret;
894 Statement st(_database, _sql_queries[GET_DISTINCT_DESTINATIONS]);
897 while (st.
step() == SQLITE_ROW)
900 const std::string destination( (
const char*)sqlite3_column_text(*st, 0) );
901 ret.insert(destination);
907 void SQLiteDatabase::update_expire_time() throw (
SQLiteDatabase::SQLiteQueryException)
909 Statement st(_database, _sql_queries[EXPIRE_NEXT_TIMESTAMP]);
913 if (err == SQLITE_ROW)
915 _next_expiration = sqlite3_column_int64(*st, 0);
919 _next_expiration = 0;
929 if ((timestamp < exp_time) || (exp_time == 0))
return;
938 Statement st(_database, _sql_queries[EXPIRE_BUNDLE_FILENAMES]);
941 sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
942 while (st.
step() == SQLITE_ROW)
944 ibrcommon::File block((
const char*)sqlite3_column_text(*st,0));
948 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
952 Statement st(_database, _sql_queries[EXPIRE_BUNDLES]);
957 sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
958 while (st.
step() == SQLITE_ROW)
960 id.source =
dtn::data::EID((
const char*)sqlite3_column_text(*st, 0));
961 id.timestamp = sqlite3_column_int64(*st, 1);
962 id.sequencenumber = sqlite3_column_int64(*st, 2);
964 id.setFragment(sqlite3_column_int64(*st, 3) >= 0);
966 if (
id.isFragment()) {
967 id.fragmentoffset = sqlite3_column_int64(*st, 3);
969 id.fragmentoffset = 0;
972 id.setPayloadLength(sqlite3_column_int64(*st, 4));
977 _listener.eventBundleExpired(
id, sqlite3_column_int(*st, 5));
980 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
984 Statement st(_database, _sql_queries[EXPIRE_BUNDLE_DELETE]);
987 sqlite3_bind_int64(*st, 1, timestamp.get<uint64_t>());
990 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
995 update_expire_time();
997 IBRCOMMON_LOGGER_TAG(SQLiteDatabase::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
1003 Statement st(_database, _sql_queries[VACUUM]);
1009 if (mode == UPDATE_CUSTODIAN)
1012 Statement st(_database, _sql_queries[BUNDLE_UPDATE_CUSTODIAN]);
1014 sqlite3_bind_text(*st, 1, eid.getString().c_str(),
static_cast<int>(eid.getString().length()), SQLITE_TRANSIENT);
1015 set_bundleid(st,
id, 1);
1018 int err = st.
step();
1020 if (err != SQLITE_DONE)
1023 error <<
"update_custodian() failure: " << err <<
" " << sqlite3_errmsg(_database);
1031 if (_next_expiration == 0 || ttl < _next_expiration)
1033 _next_expiration = ttl;
1037 void SQLiteDatabase::reset_expire_time() throw ()
1039 _next_expiration = 0;
1044 return _next_expiration;
1047 void SQLiteDatabase::set_bundleid(Statement &st,
const dtn::data::BundleID &
id,
int offset)
const throw (SQLiteDatabase::SQLiteQueryException)
1049 sqlite3_bind_text(*st, offset + 1,
id.source.getString().c_str(),
static_cast<int>(
id.source.getString().length()), SQLITE_TRANSIENT);
1050 sqlite3_bind_int64(*st, offset + 2,
id.timestamp.get<uint64_t>());
1051 sqlite3_bind_int64(*st, offset + 3,
id.sequencenumber.get<uint64_t>());
1053 if (
id.isFragment())
1055 sqlite3_bind_int64(*st, offset + 4,
id.fragmentoffset.get<uint64_t>());
1056 sqlite3_bind_int64(*st, offset + 5,
id.getPayloadLength());
1060 sqlite3_bind_int64(*st, offset + 4, -1);
1061 sqlite3_bind_int64(*st, offset + 5, -1);
std::string toString() const
virtual ~SQLBundleQuery()=0
virtual ~SQLiteDatabase()
Integer getPriority() const
bool contains(const dtn::data::BundleID &id)
void setFaulty(bool mode)
virtual const eid_set getDistinctDestinations()
static dtn::data::Timestamp getExpireTime(const dtn::data::Bundle &b)
void sql_tracer(void *, const char *pQuery)
Statement(sqlite3 *database, const std::string &)
virtual int bind(sqlite3_stmt *, int offset) const
static void raise(const dtn::data::Bundle &bundle)
void update(UPDATE_VALUES, const dtn::data::BundleID &id, const dtn::data::EID &)
sqlite3_stmt * operator*()
static SDNV< Size > max()
void store(const dtn::data::Bundle &bundle, const dtn::data::Length &size)
virtual void iterateDatabase(const dtn::data::MetaBundle &, const dtn::data::Length)=0
dtn::data::Size count() const
dtn::data::Length remove(const dtn::data::BundleID &id)
std::pair< int, const ibrcommon::File > blocklist_entry
void expire(const dtn::data::Timestamp ×tamp)
SQLiteDatabase(const ibrcommon::File &file, DatabaseListener &listener)
virtual void get(const BundleSelector &cb, BundleResult &result)
virtual const std::string getWhere() const =0
static bool isExpired(const dtn::data::Timestamp ×tamp, const dtn::data::Number &lifetime)
std::list< std::pair< int, const ibrcommon::File > > blocklist
Number getHopsToLive() const
virtual ~DatabaseListener()=0