00001
00002
00003
00004
00005
00006
00007
00008
00009 #include "core/SQLiteBundleStorage.h"
00010 #include "core/TimeEvent.h"
00011 #include "core/GlobalEvent.h"
00012 #include "core/BundleCore.h"
00013 #include "core/SQLiteConfigure.h"
00014 #include "core/BundleEvent.h"
00015 #include "core/BundleExpiredEvent.h"
00016
00017 #include <ibrdtn/data/PayloadBlock.h>
00018 #include <ibrdtn/data/Serializer.h>
00019 #include <ibrdtn/data/Bundle.h>
00020 #include <ibrdtn/data/BundleID.h>
00021
00022 #include <ibrcommon/thread/MutexLock.h>
00023 #include <ibrcommon/data/BLOB.h>
00024 #include <ibrcommon/AutoDelete.h>
00025 #include <ibrcommon/Logger.h>
00026
00027 namespace dtn
00028 {
00029 namespace core
00030 {
00031 void sql_tracer(void*, const char * pQuery)
00032 {
00033 IBRCOMMON_LOGGER_DEBUG(50) << "sqlite trace: " << pQuery << IBRCOMMON_LOGGER_ENDL;
00034 }
00035
00036 const std::string SQLiteBundleStorage::_tables[SQL_TABLE_END] =
00037 { "bundles", "blocks", "routing", "routing_bundles", "routing_nodes" };
00038
00039 const std::string SQLiteBundleStorage::_sql_queries[SQL_QUERIES_END] =
00040 {
00041 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM " + _tables[SQL_TABLE_BUNDLE] + " ORDER BY priority DESC LIMIT ?,?;",
00042 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL LIMIT 1;",
00043 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? LIMIT 1;",
00044 "SELECT * FROM " + _tables[SQL_TABLE_BUNDLE] + " WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset != NULL ORDER BY fragmentoffset ASC;",
00045
00046 "SELECT source, timestamp, sequencenumber, fragmentoffset, procflags FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
00047 "SELECT filename FROM "+ _tables[SQL_TABLE_BUNDLE] +" as a, "+ _tables[SQL_TABLE_BLOCK] +" as b WHERE a.source_id = b.source_id AND a.timestamp = b.timestamp AND a.sequencenumber = b.sequencenumber AND a.fragmentoffset = b.fragmentoffset AND a.expiretime <= ?;",
00048 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
00049 "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY expiretime ASC LIMIT 1;",
00050
00051 "SELECT ROWID FROM "+ _tables[SQL_TABLE_BUNDLE] +" LIMIT 1;",
00052 "SELECT COUNT(ROWID) FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
00053
00054 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;",
00055 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00056 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
00057 "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +" (source_id, timestamp, sequencenumber, fragmentoffset, source, destination, reportto, custodian, procflags, lifetime, appdatalength, expiretime, priority) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);",
00058 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;",
00059 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00060
00061 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET procflags = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00062
00063 "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL ORDER BY ordernumber ASC;",
00064 "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? ORDER BY ordernumber ASC;",
00065 "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL AND ordernumber = ?;",
00066 "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? AND ordernumber = ?;",
00067 "DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +";",
00068 "INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +" (source_id, timestamp, sequencenumber, fragmentoffset, blocktype, filename, ordernumber) VALUES (?,?,?,?,?,?,?);",
00069
00070 #ifdef SQLITE_STORAGE_EXTENDED
00071 "SELECT Routing FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;",
00072 "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;",
00073 "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +";",
00074 "INSERT INTO "+ _tables[SQL_TABLE_ROUTING] +" (Key, Routing) VALUES (?,?);",
00075
00076 "SELECT Routing FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;",
00077 "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;",
00078 "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +";",
00079 "INSERT INTO "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (EID, Key, Routing) VALUES (?,?,?);",
00080
00081 "SELECT Routing FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;",
00082 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;",
00083 "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID, Key, Routing) VALUES (?,?,?);",
00084 #endif
00085
00086 "VACUUM;"
00087 };
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104 const std::string SQLiteBundleStorage::_db_structure[10] =
00105 {
00106 "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BLOCK] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `blocktype` INTEGER NOT NULL, `filename` TEXT NOT NULL, `ordernumber` INTEGER NOT NULL);",
00107 "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BUNDLE] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `source` TEXT NOT NULL, `destination` TEXT NOT NULL, `reportto` TEXT NOT NULL, `custodian` TEXT NOT NULL, `procflags` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `lifetime` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `appdatalength` INTEGER DEFAULT NULL, `expiretime` INTEGER NOT NULL, `priority` INTEGER NOT NULL);",
00108 "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);",
00109 "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);",
00110 "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);",
00111 "CREATE TRIGGER IF NOT EXISTS blocks_autodelete AFTER DELETE ON " + _tables[SQL_TABLE_BUNDLE] + " FOR EACH ROW BEGIN DELETE FROM " + _tables[SQL_TABLE_BLOCK] + " WHERE " + _tables[SQL_TABLE_BLOCK] + ".source_id = OLD.source_id AND " + _tables[SQL_TABLE_BLOCK] + ".timestamp = OLD.timestamp AND " + _tables[SQL_TABLE_BLOCK] + ".sequencenumber = OLD.sequencenumber AND ((" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset IS NULL AND old.fragmentoffset IS NULL) OR (" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset = old.fragmentoffset)); END;",
00112 "CREATE UNIQUE INDEX IF NOT EXISTS blocks_bid ON " + _tables[SQL_TABLE_BLOCK] + " (source_id, timestamp, sequencenumber, fragmentoffset);",
00113 "CREATE INDEX IF NOT EXISTS bundles_destination ON " + _tables[SQL_TABLE_BUNDLE] + " (destination);",
00114 "CREATE INDEX IF NOT EXISTS bundles_destination_priority ON " + _tables[SQL_TABLE_BUNDLE] + " (destination, priority);",
00115 "CREATE UNIQUE INDEX IF NOT EXISTS bundles_id ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset);"
00116 "CREATE INDEX IF NOT EXISTS bundles_expire ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset, expiretime);"
00117 };
00118
00119 ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex;
00120 bool SQLiteBundleStorage::TaskIdle::_idle = false;
00121
00122 SQLiteBundleStorage::SQLBundleQuery::SQLBundleQuery()
00123 : _statement(NULL)
00124 { }
00125
00126 SQLiteBundleStorage::SQLBundleQuery::~SQLBundleQuery()
00127 {
00128
00129 if (_statement != NULL)
00130 {
00131 sqlite3_finalize(_statement);
00132 }
00133 }
00134
00135 SQLiteBundleStorage::AutoResetLock::AutoResetLock(ibrcommon::Mutex &mutex, sqlite3_stmt *st)
00136 : _lock(mutex), _st(st)
00137 {
00138
00139 }
00140
00141 SQLiteBundleStorage::AutoResetLock::~AutoResetLock()
00142 {
00143 sqlite3_reset(_st);
00144 }
00145
00146 SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const size_t &size)
00147 : dbPath(path), dbFile(path.get("sqlite.db")), dbSize(size), _next_expiration(0)
00148 {
00149
00150 SQLiteConfigure::configure();
00151
00152
00153 if (sqlite3_threadsafe() == 0)
00154 {
00155 IBRCOMMON_LOGGER(critical) << "sqlite library has not compiled with threading support." << IBRCOMMON_LOGGER_ENDL;
00156 throw ibrcommon::Exception("need threading support in sqlite!");
00157 }
00158 }
00159
00160 SQLiteBundleStorage::~SQLiteBundleStorage()
00161 {
00162 stop();
00163 join();
00164
00165 ibrcommon::MutexLock l(*this);
00166
00167
00168 for (int i = 0; i < SQL_QUERIES_END; i++)
00169 {
00170
00171 sqlite3_finalize(_statements[i]);
00172 }
00173
00174
00175 if (sqlite3_close(_database) != SQLITE_OK)
00176 {
00177 IBRCOMMON_LOGGER(error) << "unable to close database" << IBRCOMMON_LOGGER_ENDL;
00178 }
00179
00180
00181 SQLiteConfigure::shutdown();
00182 }
00183
00184 void SQLiteBundleStorage::openDatabase(const ibrcommon::File &path)
00185 {
00186 ibrcommon::MutexLock l(*this);
00187
00188
00189 _blockPath = dbPath.get(_tables[SQL_TABLE_BLOCK]);
00190
00191
00192 if (sqlite3_open_v2(path.getPath().c_str(), &_database, SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL))
00193 {
00194 IBRCOMMON_LOGGER(error) << "Can't open database: " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
00195 sqlite3_close(_database);
00196 throw ibrcommon::Exception("Unable to open sqlite database");
00197 }
00198
00199 int err = 0;
00200
00201
00202 for (size_t i = 0; i < 10; i++)
00203 {
00204 sqlite3_stmt *st = prepare(_db_structure[i]);
00205 err = sqlite3_step(st);
00206 if(err != SQLITE_DONE)
00207 {
00208 IBRCOMMON_LOGGER(error) << "SQLiteBundleStorage: failed to create database" << err << IBRCOMMON_LOGGER_ENDL;
00209 }
00210 sqlite3_reset(st);
00211 sqlite3_finalize(st);
00212 }
00213
00214
00215 ibrcommon::File::createDirectory( _blockPath );
00216
00217
00218 for (int i = 0; i < SQL_QUERIES_END; i++)
00219 {
00220
00221 int err = sqlite3_prepare_v2(_database, _sql_queries[i].c_str(), _sql_queries[i].length(), &_statements[i], 0);
00222 if ( err != SQLITE_OK )
00223 {
00224 IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failed to prepare statement: " << err << " with query: " << _sql_queries[i] << IBRCOMMON_LOGGER_ENDL;
00225 }
00226 }
00227
00228
00229 sqlite3_exec(_database, "PRAGMA synchronous = OFF;", NULL, NULL, NULL);
00230
00231
00232 if (IBRCOMMON_LOGGER_LEVEL >= 50)
00233 {
00234 sqlite3_trace(_database, &sql_tracer, NULL);
00235 }
00236 }
00237
00238 void SQLiteBundleStorage::componentRun()
00239 {
00240
00241 try {
00242 while (true)
00243 {
00244 Task *t = _tasks.getnpop(true);
00245
00246 try {
00247 BlockingTask &btask = dynamic_cast<BlockingTask&>(*t);
00248 try {
00249 btask.run(*this);
00250 } catch (const std::exception&) {
00251 btask.abort();
00252 continue;
00253 };
00254 btask.done();
00255 continue;
00256 } catch (const std::bad_cast&) { };
00257
00258 try {
00259 ibrcommon::AutoDelete<Task> killer(t);
00260 t->run(*this);
00261 } catch (const std::exception&) { };
00262 }
00263 } catch (const ibrcommon::QueueUnblockedException &ex) {
00264
00265 }
00266 }
00267
00268 void SQLiteBundleStorage::componentUp()
00269 {
00270
00271 bindEvent(TimeEvent::className);
00272 bindEvent(GlobalEvent::className);
00273
00274
00275 openDatabase(dbFile);
00276
00277
00278 check_consistency();
00279
00280
00281 update_expire_time();
00282 };
00283
00284 void SQLiteBundleStorage::componentDown()
00285 {
00286
00287 unbindEvent(TimeEvent::className);
00288 unbindEvent(GlobalEvent::className);
00289 };
00290
00291 bool SQLiteBundleStorage::__cancellation()
00292 {
00293 _tasks.abort();
00294 return true;
00295 }
00296
00297 void SQLiteBundleStorage::check_consistency()
00298 {
00299
00300
00301
00302
00303
00304
00305 int err;
00306 size_t timestamp, sequencenumber, offset, pFlags, appdata, lifetime;
00307 set<string> blockFiles,payloadfiles;
00308 string datei;
00309
00310 list<ibrcommon::File> blist;
00311 list<ibrcommon::File>::iterator file_it;
00312
00313
00314 _blockPath.getFiles(blist);
00315 for(file_it = blist.begin(); file_it != blist.end(); file_it++)
00316 {
00317 datei = (*file_it).getPath();
00318 if(datei[datei.size()-1] != '.')
00319 {
00320 if(((*file_it).getBasename())[0] == 'b')
00321 {
00322 blockFiles.insert(datei);
00323 }
00324 else
00325 payloadfiles.insert(datei);
00326 }
00327 }
00328
00329
00330 check_bundles(blockFiles);
00331
00332
00333 check_fragments(payloadfiles);
00334 }
00335
00336 void SQLiteBundleStorage::check_fragments(std::set<std::string> &payloadFiles)
00337 {
00338 int err;
00339 size_t procFlags, timestamp, sequencenumber, appdatalength, lifetime;
00340 string filename, source, dest, custody, repto;
00341 set<string> consistenFiles, inconsistenSources;
00342 set<size_t> inconsistentTimestamp, inconsistentSeq_number;
00343 set<string>::iterator file_it, consisten_it;
00344
00345 sqlite3_stmt *getPayloadfiles = prepare("SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+_tables[SQL_TABLE_BUNDLE]+" WHERE fragmentoffset != NULL;");
00346
00347 for(err = sqlite3_step(getPayloadfiles); err == SQLITE_ROW; err = sqlite3_step(getPayloadfiles))
00348 {
00349 filename = (const char*)sqlite3_column_text(getPayloadfiles,10);
00350 file_it = payloadFiles.find(filename);
00351
00352
00353 if (file_it == payloadFiles.end())
00354 {
00355 consisten_it = consistenFiles.find(*file_it);
00356
00357
00358 if(consisten_it == consistenFiles.end()){
00359
00360 source = (const char*) sqlite3_column_text(getPayloadfiles,0);
00361 dest = (const char*) sqlite3_column_text(getPayloadfiles,1);
00362 repto = (const char*) sqlite3_column_text(getPayloadfiles,2);
00363 custody = (const char*) sqlite3_column_text(getPayloadfiles,3);
00364 procFlags = sqlite3_column_int64(getPayloadfiles,4);
00365 timestamp = sqlite3_column_int64(getPayloadfiles,5);
00366 sequencenumber = sqlite3_column_int64(getPayloadfiles,6);
00367 lifetime = sqlite3_column_int64(getPayloadfiles,7);
00368 appdatalength = sqlite3_column_int64(getPayloadfiles,9);
00369
00370 dtn::data::BundleID id(dtn::data::EID(source),timestamp,sequencenumber);
00371 dtn::data::MetaBundle mb(id, lifetime, dtn::data::DTNTime(), dtn::data::EID(dest), dtn::data::EID(repto), dtn::data::EID(custody), appdatalength, procFlags);
00372 dtn::core::BundleEvent::raise(mb, BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE);
00373 }
00374 }
00375
00376 else
00377 consistenFiles.insert(*file_it);
00378 payloadFiles.erase(file_it);
00379 }
00380
00381 sqlite3_reset(getPayloadfiles);
00382 sqlite3_finalize(getPayloadfiles);
00383 }
00384
00385 void SQLiteBundleStorage::check_bundles(std::set<std::string> &blockFiles)
00386 {
00387 std::set<dtn::data::BundleID> corrupt_bundle_ids;
00388
00389 sqlite3_stmt *blockConistencyCheck = prepare("SELECT source_id, timestamp, sequencenumber, fragmentoffset, filename, ordernumber FROM "+ _tables[SQL_TABLE_BLOCK] +";");
00390
00391 while (sqlite3_step(blockConistencyCheck) == SQLITE_ROW)
00392 {
00393 dtn::data::BundleID id;
00394
00395
00396 get_bundleid(blockConistencyCheck, id);
00397
00398
00399 std::string filename = (const char*)sqlite3_column_text(blockConistencyCheck, 4);
00400
00401
00402 if (blockFiles.find(filename) == blockFiles.end())
00403 {
00404
00405 corrupt_bundle_ids.insert(id);
00406 }
00407 else
00408 {
00409
00410 blockFiles.erase(filename);
00411 }
00412 }
00413 sqlite3_reset(blockConistencyCheck);
00414 sqlite3_finalize(blockConistencyCheck);
00415
00416 for(std::set<dtn::data::BundleID>::const_iterator iter = corrupt_bundle_ids.begin(); iter != corrupt_bundle_ids.end(); iter++)
00417 {
00418 const dtn::data::BundleID &id = (*iter);
00419
00420
00421 const dtn::data::MetaBundle m = get_meta(id);
00422
00423
00424 remove(id);
00425 }
00426
00427
00428 for (std::set<std::string>::const_iterator iter = blockFiles.begin(); iter != blockFiles.end(); iter++)
00429 {
00430 ibrcommon::File blockfile(*iter);
00431 blockfile.remove();
00432 }
00433 }
00434
00435
00436 sqlite3_stmt* SQLiteBundleStorage::prepare(const std::string &sqlquery)
00437 {
00438
00439 sqlite3_stmt *statement;
00440 int err = sqlite3_prepare_v2(_database, sqlquery.c_str(), sqlquery.length(), &statement, 0);
00441 if ( err != SQLITE_OK )
00442 {
00443 IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failure in prepareStatement: " << err << " with Query: " << sqlquery << IBRCOMMON_LOGGER_ENDL;
00444 }
00445 return statement;
00446 }
00447
00448 dtn::data::MetaBundle SQLiteBundleStorage::get_meta(const dtn::data::BundleID &id)
00449 {
00450 dtn::data::MetaBundle bundle;
00451
00452
00453 if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException();
00454
00455 size_t stmt_key = BUNDLE_GET_ID;
00456 if (id.fragment) stmt_key = FRAGMENT_GET_ID;
00457
00458
00459 AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
00460
00461
00462 set_bundleid(_statements[stmt_key], id);
00463
00464
00465 if (sqlite3_step(_statements[stmt_key]) != SQLITE_ROW)
00466 {
00467 stringstream error;
00468 error << "SQLiteBundleStorage: No Bundle found with BundleID: " << id.toString();
00469 IBRCOMMON_LOGGER_DEBUG(15) << error.str() << IBRCOMMON_LOGGER_ENDL;
00470 throw SQLiteQueryException(error.str());
00471 }
00472
00473
00474 get(_statements[stmt_key], bundle);
00475
00476 return bundle;
00477 }
00478
00479 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::MetaBundle &bundle, size_t offset)
00480 {
00481 bundle.source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) );
00482 bundle.destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) );
00483 bundle.reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) );
00484 bundle.custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) );
00485 bundle.procflags = sqlite3_column_int(st, offset + 4);
00486 bundle.timestamp = sqlite3_column_int64(st, offset + 5);
00487 bundle.sequencenumber = sqlite3_column_int64(st, offset + 6);
00488 bundle.lifetime = sqlite3_column_int64(st, offset + 7);
00489
00490 if (bundle.procflags & data::Bundle::FRAGMENT)
00491 {
00492 bundle.offset = sqlite3_column_int64(st, offset + 8);
00493 bundle.appdatalength = sqlite3_column_int64(st, offset + 9);
00494 }
00495 }
00496
00497 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::Bundle &bundle, size_t offset)
00498 {
00499 bundle._source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) );
00500 bundle._destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) );
00501 bundle._reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) );
00502 bundle._custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) );
00503 bundle._procflags = sqlite3_column_int(st, offset + 4);
00504 bundle._timestamp = sqlite3_column_int64(st, offset + 5);
00505 bundle._sequencenumber = sqlite3_column_int64(st, offset + 6);
00506 bundle._lifetime = sqlite3_column_int64(st, offset + 7);
00507
00508 if (bundle._procflags & data::Bundle::FRAGMENT)
00509 {
00510 bundle._fragmentoffset = sqlite3_column_int64(st, offset + 8);
00511 bundle._appdatalength = sqlite3_column_int64(st, offset + 9);
00512 }
00513 }
00514
00515 const std::list<dtn::data::MetaBundle> SQLiteBundleStorage::get(BundleFilterCallback &cb)
00516 {
00517 std::list<dtn::data::MetaBundle> ret;
00518
00519 const std::string base_query =
00520 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM " + _tables[SQL_TABLE_BUNDLE];
00521
00522 sqlite3_stmt *st = NULL;
00523 size_t bind_offset = 1;
00524
00525 try {
00526 SQLBundleQuery &query = dynamic_cast<SQLBundleQuery&>(cb);
00527
00528 if (query._statement == NULL)
00529 {
00530 std::string sql = base_query + " WHERE " + query.getWhere() + " ORDER BY priority DESC LIMIT ?,?;";
00531 query._statement = prepare(sql);
00532 }
00533
00534 st = query._statement;
00535 bind_offset = query.bind(st, 1);
00536 } catch (const std::bad_cast&) {
00537
00538 st = _statements[BUNDLE_GET_FILTER];
00539 };
00540
00541 int offset = 0;
00542 while (ret.size() != cb.limit())
00543 {
00544
00545 AutoResetLock l(_locks[BUNDLE_GET_FILTER], st);
00546
00547 sqlite3_bind_int64(st, bind_offset, offset);
00548 sqlite3_bind_int64(st, bind_offset + 1, cb.limit());
00549
00550 if (sqlite3_step(st) == SQLITE_DONE)
00551 {
00552
00553 break;
00554 }
00555
00556 while (ret.size() != cb.limit())
00557 {
00558 dtn::data::MetaBundle m;
00559
00560
00561 get(st, m, 0);
00562
00563
00564 if (!contains_deletion(m))
00565 {
00566
00567 if (cb.shouldAdd(m))
00568 {
00569 IBRCOMMON_LOGGER_DEBUG(40) << "add bundle to query selection list: " << m.toString() << IBRCOMMON_LOGGER_ENDL;
00570
00571
00572 ret.push_back(m);
00573 }
00574 }
00575
00576 if (sqlite3_step(st) != SQLITE_ROW)
00577 {
00578 break;
00579 }
00580 }
00581
00582
00583 offset += cb.limit();
00584 }
00585
00586 return ret;
00587 }
00588
00589 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id)
00590 {
00591 dtn::data::Bundle bundle;
00592 int err = 0;
00593
00594 IBRCOMMON_LOGGER_DEBUG(25) << "get bundle from sqlite storage " << id.toString() << IBRCOMMON_LOGGER_ENDL;
00595
00596
00597 if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException();
00598
00599 size_t stmt_key = BUNDLE_GET_ID;
00600 if (id.fragment) stmt_key = FRAGMENT_GET_ID;
00601
00602
00603 AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
00604
00605
00606 set_bundleid(_statements[stmt_key], id);
00607
00608
00609 if ((err = sqlite3_step(_statements[stmt_key])) != SQLITE_ROW)
00610 {
00611 IBRCOMMON_LOGGER_DEBUG(15) << "sql error: " << err << "; No bundle found with id: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
00612 throw dtn::core::BundleStorage::NoBundleFoundException();
00613 }
00614
00615
00616 get(_statements[stmt_key], bundle);
00617
00618 try {
00619
00620 get_blocks(bundle, id);
00621 } catch (const ibrcommon::Exception &ex) {
00622 IBRCOMMON_LOGGER(error) << "could not get bundle blocks: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00623 throw dtn::core::BundleStorage::NoBundleFoundException();
00624 }
00625
00626 return bundle;
00627 }
00628
00629 #ifdef SQLITE_STORAGE_EXTENDED
00630 void SQLiteBundleStorage::setPriority(const int priority, const dtn::data::BundleID &id)
00631 {
00632 int err;
00633 size_t procflags;
00634 {
00635 sqlite3_bind_text(_statements[PROCFLAGS_GET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT);
00636 err = sqlite3_step(_statements[PROCFLAGS_GET]);
00637 if(err == SQLITE_ROW)
00638 procflags = sqlite3_column_int(_statements[PROCFLAGS_GET],0);
00639 sqlite3_reset(_statements[PROCFLAGS_GET]);
00640 if(err != SQLITE_DONE){
00641 stringstream error;
00642 error << "SQLiteBundleStorage: error while Select Querry: " << err;
00643 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00644 throw SQLiteQueryException(error.str());
00645 }
00646
00647
00648 procflags -= ( 128*(procflags & dtn::data::Bundle::PRIORITY_BIT1) + 256*(procflags & dtn::data::Bundle::PRIORITY_BIT2));
00649
00650
00651 switch(priority){
00652 case 1: procflags += data::Bundle::PRIORITY_BIT1; break;
00653 case 2: procflags += data::Bundle::PRIORITY_BIT2; break;
00654 case 3: procflags += (data::Bundle::PRIORITY_BIT1 + data::Bundle::PRIORITY_BIT2); break;
00655 }
00656
00657 sqlite3_bind_text(_statements[PROCFLAGS_SET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT);
00658 sqlite3_bind_int64(_statements[PROCFLAGS_SET],2,priority);
00659 sqlite3_step(_statements[PROCFLAGS_SET]);
00660 sqlite3_reset(_statements[PROCFLAGS_SET]);
00661
00662 if(err != SQLITE_DONE)
00663 {
00664 stringstream error;
00665 error << "SQLiteBundleStorage: setPriority error while Select Querry: " << err;
00666 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00667 throw SQLiteQueryException(error.str());
00668 }
00669 }
00670 }
00671 #endif
00672
00673 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle)
00674 {
00675 IBRCOMMON_LOGGER_DEBUG(25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00676
00677
00678 bool local = (bundle._destination.sameHost(BundleCore::local)
00679 && (bundle._procflags & dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON));
00680
00681
00682
00683
00684
00685
00686
00687
00688 int err;
00689
00690 const dtn::data::EID _sourceid = bundle._source;
00691 size_t TTL = bundle._timestamp + bundle._lifetime;
00692
00693 AutoResetLock l(_locks[BUNDLE_STORE], _statements[BUNDLE_STORE]);
00694
00695 set_bundleid(_statements[BUNDLE_STORE], bundle);
00696
00697 sqlite3_bind_text(_statements[BUNDLE_STORE], 5, bundle._source.getString().c_str(), bundle._source.getString().length(), SQLITE_TRANSIENT);
00698 sqlite3_bind_text(_statements[BUNDLE_STORE], 6, bundle._destination.getString().c_str(), bundle._destination.getString().length(), SQLITE_TRANSIENT);
00699 sqlite3_bind_text(_statements[BUNDLE_STORE], 7, bundle._reportto.getString().c_str(), bundle._reportto.getString().length(), SQLITE_TRANSIENT);
00700 sqlite3_bind_text(_statements[BUNDLE_STORE], 8, bundle._custodian.getString().c_str(), bundle._custodian.getString().length(), SQLITE_TRANSIENT);
00701 sqlite3_bind_int(_statements[BUNDLE_STORE], 9, bundle._procflags);
00702 sqlite3_bind_int64(_statements[BUNDLE_STORE], 10, bundle._lifetime);
00703
00704 if (bundle.get(dtn::data::Bundle::FRAGMENT))
00705 {
00706 sqlite3_bind_int64(_statements[BUNDLE_STORE], 11, bundle._appdatalength);
00707 }
00708 else
00709 {
00710 sqlite3_bind_null(_statements[BUNDLE_STORE], 4);
00711 sqlite3_bind_null(_statements[BUNDLE_STORE], 11);
00712 }
00713
00714 sqlite3_bind_int64(_statements[BUNDLE_STORE], 12, TTL);
00715 sqlite3_bind_int64(_statements[BUNDLE_STORE], 13, dtn::data::MetaBundle(bundle).getPriority());
00716
00717
00718 sqlite3_exec(_database, "BEGIN TRANSACTION;", NULL, NULL, NULL);
00719
00720
00721 err = sqlite3_step(_statements[BUNDLE_STORE]);
00722
00723 if (err == SQLITE_CONSTRAINT)
00724 {
00725 IBRCOMMON_LOGGER(warning) << "Bundle is already in the storage" << IBRCOMMON_LOGGER_ENDL;
00726 }
00727 else if (err != SQLITE_DONE)
00728 {
00729 stringstream error;
00730 error << "SQLiteBundleStorage: store() failure: " << err << " " << sqlite3_errmsg(_database);
00731 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00732
00733
00734 sqlite3_exec(_database, "ROLLBACK TRANSACTION;", NULL, NULL, NULL);
00735
00736 throw SQLiteQueryException(error.str());
00737 }
00738 else
00739 {
00740
00741 store_blocks(bundle);
00742
00743 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL;
00744 }
00745
00746
00747 sqlite3_exec(_database, "END TRANSACTION;", NULL, NULL, NULL);
00748
00749
00750 new_expire_time(TTL);
00751
00752 try {
00753
00754 const dtn::data::EID custodian = acceptCustody(bundle);
00755
00756
00757 update_custodian(bundle, custodian);
00758 } catch (const ibrcommon::Exception&) {
00759
00760 }
00761 }
00762
00763 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id)
00764 {
00765 add_deletion(id);
00766 _tasks.push(new TaskRemove(id));
00767 }
00768
00769 void SQLiteBundleStorage::TaskRemove::run(SQLiteBundleStorage &storage)
00770 {
00771 {
00772
00773 const size_t stmt_key = _id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID;
00774
00775
00776 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]);
00777
00778
00779 storage.set_bundleid(storage._statements[stmt_key], _id);
00780
00781
00782 while (sqlite3_step(storage._statements[stmt_key]) == SQLITE_ROW)
00783 {
00784
00785 ibrcommon::File blockfile( (const char*)sqlite3_column_text(storage._statements[stmt_key], 0) );
00786 blockfile.remove();
00787 }
00788 }
00789
00790 {
00791 const size_t stmt_key = _id.fragment ? FRAGMENT_DELETE : BUNDLE_DELETE;
00792
00793
00794 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]);
00795
00796
00797 storage.set_bundleid(storage._statements[stmt_key], _id);
00798 sqlite3_step(storage._statements[stmt_key]);
00799
00800 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << _id.toString() << " deleted" << IBRCOMMON_LOGGER_ENDL;
00801 }
00802
00803
00804 storage.update_expire_time();
00805
00806
00807 storage.remove_deletion(_id);
00808 }
00809
00810 #ifdef SQLITE_STORAGE_EXTENDED
00811 std::string SQLiteBundleStorage::getBundleRoutingInfo(const data::BundleID &bundleID, const int &key)
00812 {
00813 int err;
00814 string result;
00815
00816 sqlite3_bind_text(_statements[ROUTING_GET],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00817 sqlite3_bind_int(_statements[ROUTING_GET],2,key);
00818 err = sqlite3_step(_statements[ROUTING_GET]);
00819 if(err == SQLITE_ROW){
00820 result = (const char*) sqlite3_column_text(_statements[ROUTING_GET],0);
00821 }
00822 sqlite3_reset(_statements[ROUTING_GET]);
00823 if(err != SQLITE_DONE && err != SQLITE_ROW){
00824 stringstream error;
00825 error << "SQLiteBundleStorage: getBundleRoutingInfo: " << err << " errmsg: " << err;
00826 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00827 throw SQLiteQueryException(error.str());
00828 }
00829
00830 return result;
00831 }
00832
00833 std::string SQLiteBundleStorage::getNodeRoutingInfo(const data::EID &eid, const int &key)
00834 {
00835 int err;
00836 string result;
00837
00838 err = sqlite3_bind_text(_statements[NODE_GET],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT);
00839 err = sqlite3_bind_int(_statements[NODE_GET],2,key);
00840 err = sqlite3_step(_statements[NODE_GET]);
00841 if(err == SQLITE_ROW){
00842 result = (const char*) sqlite3_column_text(_statements[NODE_GET],0);
00843 }
00844 sqlite3_reset(_statements[NODE_GET]);
00845 if(err != SQLITE_DONE && err != SQLITE_ROW){
00846 stringstream error;
00847 error << "SQLiteBundleStorage: getNodeRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database);
00848 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00849 throw SQLiteQueryException(error.str());
00850 }
00851
00852 return result;
00853 }
00854
00855 std::string SQLiteBundleStorage::getRoutingInfo(const int &key)
00856 {
00857 int err;
00858 string result;
00859
00860 sqlite3_bind_int(_statements[INFO_GET],1,key);
00861 err = sqlite3_step(_statements[INFO_GET]);
00862 if(err == SQLITE_ROW){
00863 result = (const char*) sqlite3_column_text(_statements[INFO_GET],0);
00864 }
00865 sqlite3_reset(_statements[INFO_GET]);
00866 if(err != SQLITE_DONE && err != SQLITE_ROW){
00867 stringstream error;
00868 error << "SQLiteBundleStorage: getRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database);
00869 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00870 throw SQLiteQueryException(error.str());
00871 }
00872
00873 return result;
00874 }
00875
00876 void SQLiteBundleStorage::storeBundleRoutingInfo(const data::BundleID &bundleID, const int &key, const string &routingInfo)
00877 {
00878 int err;
00879
00880 sqlite3_bind_text(_statements[ROUTING_STORE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00881 sqlite3_bind_int(_statements[ROUTING_STORE],2,key);
00882 sqlite3_bind_text(_statements[ROUTING_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00883 err = sqlite3_step(_statements[ROUTING_STORE]);
00884 sqlite3_reset(_statements[ROUTING_STORE]);
00885 if(err == SQLITE_CONSTRAINT)
00886 cerr << "warning: BundleRoutingInformations for "<<bundleID.toString() <<" are either already in the storage or there is no Bundle with this BundleID."<<endl;
00887 else if(err != SQLITE_DONE){
00888 stringstream error;
00889 error << "SQLiteBundleStorage: storeBundleRoutingInfo: " << err << " errmsg: " << err;
00890 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00891 throw SQLiteQueryException(error.str());
00892 }
00893 }
00894
00895 void SQLiteBundleStorage::storeNodeRoutingInfo(const data::EID &node, const int &key, const std::string &routingInfo)
00896 {
00897 int err;
00898
00899 sqlite3_bind_text(_statements[NODE_STORE],1,node.getString().c_str(),node.getString().length(), SQLITE_TRANSIENT);
00900 sqlite3_bind_int(_statements[NODE_STORE],2,key);
00901 sqlite3_bind_text(_statements[NODE_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00902 err = sqlite3_step(_statements[NODE_STORE]);
00903 sqlite3_reset(_statements[NODE_STORE]);
00904 if(err == SQLITE_CONSTRAINT)
00905 cerr << "warning: NodeRoutingInfo for "<<node.getString() <<" are already in the storage."<<endl;
00906 else if(err != SQLITE_DONE){
00907 stringstream error;
00908 error << "SQLiteBundleStorage: storeNodeRoutingInfo: " << err << " errmsg: " << err;
00909 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00910 throw SQLiteQueryException(error.str());
00911 }
00912 }
00913
00914 void SQLiteBundleStorage::storeRoutingInfo(const int &key, const std::string &routingInfo)
00915 {
00916 int err;
00917
00918 sqlite3_bind_int(_statements[INFO_STORE],1,key);
00919 sqlite3_bind_text(_statements[INFO_STORE],2,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00920 err = sqlite3_step(_statements[INFO_STORE]);
00921 sqlite3_reset(_statements[INFO_STORE]);
00922 if(err == SQLITE_CONSTRAINT)
00923 cerr << "warning: There are already RoutingInformation refereed by this Key"<<endl;
00924 else if(err != SQLITE_DONE){
00925 stringstream error;
00926 error << "SQLiteBundleStorage: storeRoutingInfo: " << err << " errmsg: " << err;
00927 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00928 throw SQLiteQueryException(error.str());
00929 }
00930 }
00931
00932 void SQLiteBundleStorage::removeBundleRoutingInfo(const data::BundleID &bundleID, const int &key)
00933 {
00934 int err;
00935
00936 sqlite3_bind_text(_statements[ROUTING_REMOVE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00937 sqlite3_bind_int(_statements[ROUTING_REMOVE],2,key);
00938 err = sqlite3_step(_statements[ROUTING_REMOVE]);
00939 sqlite3_reset(_statements[ROUTING_REMOVE]);
00940 if(err != SQLITE_DONE){
00941 stringstream error;
00942 error << "SQLiteBundleStorage: removeBundleRoutingInfo: " << err << " errmsg: " << err;
00943 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00944 throw SQLiteQueryException(error.str());
00945 }
00946 }
00947
00948 void SQLiteBundleStorage::removeNodeRoutingInfo(const data::EID &eid, const int &key)
00949 {
00950 int err;
00951
00952 sqlite3_bind_text(_statements[NODE_REMOVE],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT);
00953 sqlite3_bind_int(_statements[NODE_REMOVE],2,key);
00954 err = sqlite3_step(_statements[NODE_REMOVE]);
00955 sqlite3_reset(_statements[NODE_REMOVE]);
00956 if(err != SQLITE_DONE){
00957 stringstream error;
00958 error << "SQLiteBundleStorage: removeNodeRoutingInfo: " << err << " errmsg: " << err;
00959 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00960 throw SQLiteQueryException(error.str());
00961 }
00962 }
00963
00964 void SQLiteBundleStorage::removeRoutingInfo(const int &key)
00965 {
00966 int err;
00967
00968 sqlite3_bind_int(_statements[INFO_REMOVE],1,key);
00969 err = sqlite3_step(_statements[INFO_REMOVE]);
00970 sqlite3_reset(_statements[INFO_REMOVE]);
00971 if(err != SQLITE_DONE){
00972 stringstream error;
00973 error << "SQLiteBundleStorage: removeRoutingInfo: " << err << " errmsg: " << err;
00974 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00975 throw SQLiteQueryException(error.str());
00976 }
00977 }
00978 #endif
00979
00980 void SQLiteBundleStorage::clearAll()
00981 {
00982 #ifdef SQLITE_STORAGE_EXTENDED
00983 sqlite3_step(_statements[ROUTING_CLEAR]);
00984 sqlite3_reset(_statements[ROUTING_CLEAR]);
00985 sqlite3_step(_statements[NODE_CLEAR]);
00986 sqlite3_reset(_statements[NODE_CLEAR]);
00987 #endif
00988
00989 clear();
00990 }
00991
00992 void SQLiteBundleStorage::clear()
00993 {
00994 AutoResetLock l(_locks[VACUUM], _statements[VACUUM]);
00995
00996 sqlite3_step(_statements[BUNDLE_CLEAR]);
00997 sqlite3_reset(_statements[BUNDLE_CLEAR]);
00998 sqlite3_step(_statements[BLOCK_CLEAR]);
00999 sqlite3_reset(_statements[BLOCK_CLEAR]);
01000
01001 if(SQLITE_DONE != sqlite3_step(_statements[VACUUM]))
01002 {
01003 sqlite3_reset(_statements[VACUUM]);
01004 throw SQLiteQueryException("SQLiteBundleStore: clear(): vacuum failed.");
01005 }
01006
01007
01008 {
01009 _blockPath.remove(true);
01010 ibrcommon::File::createDirectory(_blockPath);
01011 }
01012
01013 reset_expire_time();
01014 }
01015
01016
01017
01018 bool SQLiteBundleStorage::empty()
01019 {
01020 AutoResetLock l(_locks[EMPTY_CHECK], _statements[EMPTY_CHECK]);
01021
01022 if (SQLITE_DONE == sqlite3_step(_statements[EMPTY_CHECK]))
01023 {
01024 return true;
01025 }
01026 else
01027 {
01028 return false;
01029 }
01030 }
01031
01032 unsigned int SQLiteBundleStorage::count()
01033 {
01034 int rows = 0;
01035 int err = 0;
01036
01037 AutoResetLock l(_locks[COUNT_ENTRIES], _statements[COUNT_ENTRIES]);
01038
01039 if ((err = sqlite3_step(_statements[COUNT_ENTRIES])) == SQLITE_ROW)
01040 {
01041 rows = sqlite3_column_int(_statements[COUNT_ENTRIES], 0);
01042 }
01043 else
01044 {
01045 stringstream error;
01046 error << "SQLiteBundleStorage: count: failure " << err << " " << sqlite3_errmsg(_database);
01047 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01048 throw SQLiteQueryException(error.str());
01049 }
01050
01051 return rows;
01052 }
01053
01054 #ifdef SQLITE_STORAGE_EXTENDED
01055 int SQLiteBundleStorage::occupiedSpace()
01056 {
01057 int size = 0;
01058 std::list<ibrcommon::File> files;
01059
01060 if (_blockPath.getFiles(files))
01061 {
01062 IBRCOMMON_LOGGER(error) << "occupiedSpace: unable to open Directory " << _blockPath.getPath() << IBRCOMMON_LOGGER_ENDL;
01063 return -1;
01064 }
01065
01066 for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++)
01067 {
01068 const ibrcommon::File &f = (*iter);
01069
01070 if (!f.isSystem())
01071 {
01072 size += f.size();
01073 }
01074 }
01075
01076
01077 size += dbFile.size();
01078
01079 return size;
01080 }
01081
01082 void SQLiteBundleStorage::storeFragment(const dtn::data::Bundle &bundle)
01083 {
01084 int err = 0;
01085 size_t bitcounter = 0;
01086 bool allFragmentsReceived(true);
01087 size_t payloadsize, TTL, fragmentoffset;
01088
01089 ibrcommon::File payloadfile, filename;
01090
01091
01092 std::string bundleID = dtn::data::BundleID(bundle).toString();
01093
01094
01095 std::string destination = bundle._destination.getString();
01096
01097
01098 std::string sourceEID = bundle._source.getString();
01099
01100
01101 TTL = bundle._timestamp + bundle._lifetime;
01102
01103 const dtn::data::PayloadBlock &block = bundle.getBlock<dtn::data::PayloadBlock>();
01104 ibrcommon::BLOB::Reference payloadBlob = block.getBLOB();
01105
01106
01107 {
01108 ibrcommon::BLOB::iostream io = payloadBlob.iostream();
01109 payloadsize = io.size();
01110 }
01111
01112
01113 bool last = bundle._fragmentoffset + payloadsize == bundle._appdatalength;
01114 bool first = bundle._fragmentoffset == 0;
01115
01116 if(first || last)
01117 {
01118 int i = 0, blocknumber = 0;
01119
01120
01121 std::list<const dtn::data::Block*> blocklist = bundle.getBlocks();
01122
01123
01124 for (std::list<const dtn::data::Block* >::const_iterator it = blocklist.begin(); it != blocklist.end(); it++, i++)
01125 {
01126
01127 ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01128
01129 ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary);
01130
01131 dtn::data::SeparateSerializer serializer(datei);
01132 serializer << (*(*it));
01133 datei.close();
01134
01135
01136
01137
01138
01139
01140
01141 sqlite3_bind_text(_statements[BLOCK_STORE], 1, bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT);
01142 sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType()));
01143 sqlite3_bind_text(_statements[BLOCK_STORE], 3, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT);
01144
01145 blocknumber = blocklist.size() - i;
01146
01147 if (first)
01148 {
01149 blocknumber = (blocklist.size() - i)*(-1);
01150 }
01151
01152 sqlite3_bind_int(_statements[BLOCK_STORE],4,blocknumber);
01153 executeQuery(_statements[BLOCK_STORE]);
01154 }
01155 }
01156
01157
01158
01159 sqlite3_bind_text(_statements[BUNDLE_GET_FRAGMENT],1,sourceEID.c_str(),sourceEID.length(),SQLITE_TRANSIENT);
01160 sqlite3_bind_int64(_statements[BUNDLE_GET_FRAGMENT],2,bundle._timestamp);
01161 sqlite3_bind_int(_statements[BUNDLE_GET_FRAGMENT],3,bundle._sequencenumber);
01162 err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]);
01163
01164
01165 if(err == SQLITE_DONE)
01166 {
01167
01168 ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01169 payloadfile = tmpfile;
01170 }
01171
01172 else if (err == SQLITE_ROW)
01173 {
01174 payloadfile = ibrcommon::File( (const char*)sqlite3_column_text(_statements[BUNDLE_GET_FRAGMENT], 14) );
01175 }
01176
01177 while (err == SQLITE_ROW){
01178
01179
01180 fragmentoffset = sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],9);
01181
01182 if(bitcounter >= fragmentoffset){
01183 bitcounter += fragmentoffset - bitcounter + sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],13);
01184 }
01185 else if(bitcounter >= bundle._fragmentoffset){
01186 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize;
01187 }
01188 else{
01189 allFragmentsReceived = false;
01190 }
01191 err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]);
01192 }
01193
01194 if(bitcounter +1 >= bundle._fragmentoffset && allFragmentsReceived)
01195 {
01196 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize;
01197 }
01198
01199 sqlite3_reset(_statements[BUNDLE_GET_FRAGMENT]);
01200 if(err != SQLITE_DONE){
01201 stringstream error;
01202 error << "SQLiteBundleStorage: storeFragment: " << err << " errmsg: " << sqlite3_errmsg(_database);
01203 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01204 throw SQLiteQueryException(error.str());
01205 }
01206
01207
01208 std::fstream datei(payloadfile.getPath().c_str(), std::ios::in | std::ios::out | std::ios::binary);
01209 datei.seekp(bundle._fragmentoffset);
01210 {
01211 ibrcommon::BLOB::iostream io = payloadBlob.iostream();
01212
01213 size_t ret = 0;
01214 istream &s = (*io);
01215 s.seekg(0);
01216
01217 while (true)
01218 {
01219 char buf;
01220 s.get(buf);
01221 if(!s.eof()){
01222 datei.put(buf);
01223 ret++;
01224 }
01225 else
01226 break;
01227 }
01228 datei.close();
01229 }
01230
01231
01232 if(bundle._appdatalength == bitcounter)
01233 {
01234
01235
01236
01237
01238
01239
01240
01241
01242 {
01243 ibrcommon::BLOB::Reference ref(ibrcommon::FileBLOB::create(payloadfile));
01244 dtn::data::PayloadBlock pb(ref);
01245
01246
01247 if(pb.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS))
01248 {
01249 std::list<dtn::data::EID> eidlist;
01250 std::list<dtn::data::EID>::iterator eidIt;
01251 eidlist = block.getEIDList();
01252
01253 for(eidIt = eidlist.begin(); eidIt != eidlist.end(); eidIt++)
01254 {
01255 pb.addEID(*eidIt);
01256 }
01257 }
01258
01259
01260 pb.set(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT, block.get(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT));
01261 pb.set(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED));
01262 pb.set(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED));
01263 pb.set(dtn::data::PayloadBlock::LAST_BLOCK, block.get(dtn::data::PayloadBlock::LAST_BLOCK));
01264 pb.set(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED));
01265 pb.set(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED, block.get(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED));
01266 pb.set(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS, block.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS));
01267
01268
01269 ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01270
01271 std::ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary);
01272 dtn::data::SeparateSerializer serializer(datei);
01273 serializer << (pb);
01274 datei.close();
01275
01276 filename = tmpfile;
01277 }
01278
01279 sqlite3_bind_text(_statements[BLOCK_STORE], 1,bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT);
01280
01281 sqlite3_bind_text(_statements[BLOCK_STORE], 3, filename.getPath().c_str(), filename.getPath().length(), SQLITE_TRANSIENT);
01282 sqlite3_bind_int(_statements[BLOCK_STORE], 4, 0);
01283 executeQuery(_statements[BLOCK_STORE]);
01284
01285
01286 payloadfile.remove();
01287
01288
01289 sqlite3_bind_text(_statements[BUNDLE_DELETE],1,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01290 sqlite3_bind_int64(_statements[BUNDLE_DELETE],2,bundle._sequencenumber);
01291 sqlite3_bind_int64(_statements[BUNDLE_DELETE],3,bundle._timestamp);
01292 executeQuery(_statements[BUNDLE_DELETE]);
01293
01294
01295 size_t procFlags = bundle._procflags;
01296 procFlags &= ~(dtn::data::PrimaryBlock::FRAGMENT);
01297
01298 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01299 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01300 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
01301 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT);
01302 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT);
01303 sqlite3_bind_int(_statements[BUNDLE_STORE],6,procFlags);
01304 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp);
01305 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber);
01306 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime);
01307 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset);
01308 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength);
01309 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL);
01310 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL);
01311 sqlite3_bind_int(_statements[BUNDLE_STORE],14,NULL);
01312 sqlite3_bind_text(_statements[BUNDLE_STORE],15,NULL,0,SQLITE_TRANSIENT);
01313 executeQuery(_statements[BUNDLE_STORE]);
01314 }
01315
01316
01317 else{
01318 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01319 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01320 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
01321 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT);
01322 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT);
01323 sqlite3_bind_int(_statements[BUNDLE_STORE],6,bundle._procflags);
01324 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp);
01325 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber);
01326 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime);
01327 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset);
01328 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength);
01329 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL);
01330 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL);
01331 sqlite3_bind_int(_statements[BUNDLE_STORE],14,payloadsize);
01332 sqlite3_bind_text(_statements[BUNDLE_STORE],15,payloadfile.getPath().c_str(),payloadfile.getPath().length(),SQLITE_TRANSIENT);
01333 executeQuery(_statements[BUNDLE_STORE]);
01334 }
01335 }
01336 #endif
01337
01338 void SQLiteBundleStorage::raiseEvent(const Event *evt)
01339 {
01340 try {
01341 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt);
01342
01343 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
01344 {
01345 _tasks.push(new TaskExpire(time.getTimestamp()));
01346 }
01347 } catch (const std::bad_cast&) { }
01348
01349 try {
01350 const GlobalEvent &global = dynamic_cast<const GlobalEvent&>(*evt);
01351
01352 if(global.getAction() == GlobalEvent::GLOBAL_IDLE)
01353 {
01354
01355 ibrcommon::MutexLock l(TaskIdle::_mutex);
01356 TaskIdle::_idle = true;
01357
01358
01359 _tasks.push(new TaskIdle());
01360 }
01361 else if(global.getAction() == GlobalEvent::GLOBAL_BUSY)
01362 {
01363
01364 ibrcommon::MutexLock l(TaskIdle::_mutex);
01365 TaskIdle::_idle = false;
01366 }
01367 } catch (const std::bad_cast&) { }
01368 }
01369
01370 void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage)
01371 {
01372
01373
01374
01375 size_t exp_time = storage.get_expire_time();
01376 if ((_timestamp < exp_time) || (exp_time == 0)) return;
01377
01378
01379
01380
01381
01382
01383
01384 {
01385 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_FILENAMES], storage._statements[EXPIRE_BUNDLE_FILENAMES]);
01386
01387
01388 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_FILENAMES], 1, _timestamp);
01389 while (sqlite3_step(storage._statements[EXPIRE_BUNDLE_FILENAMES]) == SQLITE_ROW)
01390 {
01391 ibrcommon::File block((const char*)sqlite3_column_text(storage._statements[EXPIRE_BUNDLE_FILENAMES],0));
01392 block.remove();
01393 }
01394 }
01395
01396 {
01397 AutoResetLock l(storage._locks[EXPIRE_BUNDLES], storage._statements[EXPIRE_BUNDLES]);
01398
01399
01400 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLES], 1, _timestamp);
01401 while (sqlite3_step(storage._statements[EXPIRE_BUNDLES]) == SQLITE_ROW)
01402 {
01403 dtn::data::BundleID id;
01404 storage.get_bundleid(storage._statements[EXPIRE_BUNDLES], id);
01405 dtn::core::BundleExpiredEvent::raise(id);
01406 }
01407 }
01408
01409 {
01410 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_DELETE], storage._statements[EXPIRE_BUNDLE_DELETE]);
01411
01412
01413 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_DELETE], 1, _timestamp);
01414 sqlite3_step(storage._statements[EXPIRE_BUNDLE_DELETE]);
01415 }
01416
01417
01418 storage.update_expire_time();
01419 }
01420
01421 void SQLiteBundleStorage::update_expire_time()
01422 {
01423 ibrcommon::MutexLock l(_locks[EXPIRE_NEXT_TIMESTAMP]);
01424 int err = sqlite3_step(_statements[EXPIRE_NEXT_TIMESTAMP]);
01425
01426 if (err == SQLITE_ROW)
01427 {
01428 ibrcommon::MutexLock l(_next_expiration_lock);
01429 _next_expiration = sqlite3_column_int64(_statements[EXPIRE_NEXT_TIMESTAMP], 0);
01430 }
01431 else
01432 {
01433 ibrcommon::MutexLock l(_next_expiration_lock);
01434 _next_expiration = 0;
01435 }
01436
01437 sqlite3_reset(_statements[EXPIRE_NEXT_TIMESTAMP]);
01438 }
01439
01440 void SQLiteBundleStorage::update_custodian(const dtn::data::BundleID &id, const dtn::data::EID &custodian)
01441 {
01442
01443 STORAGE_STMT query = BUNDLE_UPDATE_CUSTODIAN;
01444 if (id.fragment) query = FRAGMENT_UPDATE_CUSTODIAN;
01445
01446
01447 AutoResetLock l(_locks[query], _statements[query]);
01448
01449 sqlite3_bind_text(_statements[query], 1, custodian.getString().c_str(), custodian.getString().length(), SQLITE_TRANSIENT);
01450 set_bundleid(_statements[query], id, 1);
01451
01452
01453 int err = sqlite3_step(_statements[query]);
01454
01455 if (err != SQLITE_DONE)
01456 {
01457 stringstream error;
01458 error << "SQLiteBundleStorage: update_custodian() failure: " << err << " " << sqlite3_errmsg(_database);
01459 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01460 }
01461 }
01462
01463 int SQLiteBundleStorage::store_blocks(const data::Bundle &bundle)
01464 {
01465 int blocktyp, blocknumber(1), storedBytes(0);
01466
01467
01468 const list<const data::Block*> blocklist = bundle.getBlocks();
01469
01470
01471 data::BundleID id(bundle);
01472
01473 for(std::list<const data::Block*>::const_iterator it = blocklist.begin() ;it != blocklist.end(); it++)
01474 {
01475 blocktyp = (int)(*it)->getType();
01476
01477 ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01478 std::ofstream filestream(tmpfile.getPath().c_str(), std::ios_base::out | std::ios::binary);
01479
01480 dtn::data::SeparateSerializer serializer(filestream);
01481 serializer << (*(*it));
01482 storedBytes += serializer.getLength(*(*it));
01483
01484 filestream.close();
01485
01486
01487 AutoResetLock l(_locks[BLOCK_STORE], _statements[BLOCK_STORE]);
01488
01489
01490 set_bundleid(_statements[BLOCK_STORE], id);
01491
01492
01493 if (!id.fragment) sqlite3_bind_null(_statements[BLOCK_STORE], 4);
01494
01495
01496 sqlite3_bind_int(_statements[BLOCK_STORE], 5, blocktyp);
01497
01498
01499 sqlite3_bind_text(_statements[BLOCK_STORE], 6, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT);
01500
01501
01502 sqlite3_bind_int(_statements[BLOCK_STORE], 7, blocknumber);
01503
01504
01505 if (sqlite3_step(_statements[BLOCK_STORE]) != SQLITE_DONE)
01506 {
01507 throw SQLiteQueryException("can not store block of bundle");
01508 }
01509
01510
01511 blocknumber++;
01512 }
01513
01514 return storedBytes;
01515 }
01516
01517 void SQLiteBundleStorage::get_blocks(dtn::data::Bundle &bundle, const dtn::data::BundleID &id)
01518 {
01519 int err = 0;
01520 string file;
01521 std::list<ibrcommon::File> files;
01522
01523
01524 const size_t stmt_key = id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID;
01525
01526 AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
01527
01528
01529 set_bundleid(_statements[stmt_key], id);
01530
01531
01532 while ((err = sqlite3_step(_statements[stmt_key])) == SQLITE_ROW)
01533 {
01534 files.push_back( ibrcommon::File( (const char*) sqlite3_column_text(_statements[stmt_key], 0) ) );
01535 }
01536
01537 if (err == SQLITE_DONE)
01538 {
01539 if (files.size() == 0)
01540 {
01541 IBRCOMMON_LOGGER(error) << "get_blocks: no blocks found for: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
01542 throw SQLiteQueryException("no blocks found");
01543 }
01544 }
01545 else
01546 {
01547 IBRCOMMON_LOGGER(error) << "get_blocks() failure: "<< err << " " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
01548 throw SQLiteQueryException("can not query for blocks");
01549 }
01550
01551
01552 for (std::list<ibrcommon::File>::const_iterator it = files.begin(); it != files.end(); it++)
01553 {
01554 const ibrcommon::File &f = (*it);
01555
01556
01557 std::ifstream is(f.getPath().c_str(), std::ios::binary | std::ios::in);
01558
01559
01560 dtn::data::SeparateDeserializer(is, bundle).readBlock();
01561
01562
01563 is.close();
01564 }
01565 }
01566
01567 void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage)
01568 {
01569
01570 while (true)
01571 {
01572
01573
01574
01575
01576
01577
01578
01579
01580
01581 {
01582 AutoResetLock l(storage._locks[VACUUM], storage._statements[VACUUM]);
01583 sqlite3_step(storage._statements[VACUUM]);
01584 }
01585
01586
01587 ::sleep(1);
01588
01589 ibrcommon::MutexLock l(TaskIdle::_mutex);
01590 if (!TaskIdle::_idle) return;
01591 }
01592 }
01593
01594 const std::string SQLiteBundleStorage::getName() const
01595 {
01596 return "SQLiteBundleStorage";
01597 }
01598
01599 void SQLiteBundleStorage::releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id)
01600 {
01601
01602
01603
01604 update_custodian(id, custodian);
01605 }
01606
01607 void SQLiteBundleStorage::new_expire_time(size_t ttl)
01608 {
01609 ibrcommon::MutexLock l(_next_expiration_lock);
01610 if (_next_expiration == 0 || ttl < _next_expiration)
01611 {
01612 _next_expiration = ttl;
01613 }
01614 }
01615
01616 void SQLiteBundleStorage::reset_expire_time()
01617 {
01618 ibrcommon::MutexLock l(_next_expiration_lock);
01619 _next_expiration = 0;
01620 }
01621
01622 size_t SQLiteBundleStorage::get_expire_time()
01623 {
01624 ibrcommon::MutexLock l(_next_expiration_lock);
01625 return _next_expiration;
01626 }
01627
01628 void SQLiteBundleStorage::add_deletion(const dtn::data::BundleID &id)
01629 {
01630 ibrcommon::MutexLock l(_deletion_mutex);
01631 _deletion_list.insert(id);
01632 }
01633
01634 void SQLiteBundleStorage::remove_deletion(const dtn::data::BundleID &id)
01635 {
01636 ibrcommon::MutexLock l(_deletion_mutex);
01637 _deletion_list.erase(id);
01638 }
01639
01640 bool SQLiteBundleStorage::contains_deletion(const dtn::data::BundleID &id)
01641 {
01642 ibrcommon::MutexLock l(_deletion_mutex);
01643 return (_deletion_list.find(id) != _deletion_list.end());
01644 }
01645
01646 void SQLiteBundleStorage::trylock() throw (ibrcommon::MutexException)
01647 {
01648
01649 throw ibrcommon::MutexException();
01650 }
01651
01652 void SQLiteBundleStorage::enter() throw (ibrcommon::MutexException)
01653 {
01654
01655 for (int i = 0; i < SQL_QUERIES_END; i++)
01656 {
01657
01658 _locks[i].enter();
01659 }
01660 }
01661
01662 void SQLiteBundleStorage::leave() throw (ibrcommon::MutexException)
01663 {
01664
01665 for (int i = 0; i < SQL_QUERIES_END; i++)
01666 {
01667
01668 _locks[i].leave();
01669 }
01670 }
01671
01672 void SQLiteBundleStorage::set_bundleid(sqlite3_stmt *st, const dtn::data::BundleID &id, size_t offset) const
01673 {
01674 const std::string source_id = id.source.getString();
01675 sqlite3_bind_text(st, offset + 1, source_id.c_str(), source_id.length(), SQLITE_TRANSIENT);
01676 sqlite3_bind_int64(st, offset + 2, id.timestamp);
01677 sqlite3_bind_int64(st, offset + 3, id.sequencenumber);
01678
01679 if (id.fragment)
01680 {
01681 sqlite3_bind_int64(st, offset + 4, id.offset);
01682 }
01683 }
01684
01685 void SQLiteBundleStorage::get_bundleid(sqlite3_stmt *st, dtn::data::BundleID &id, size_t offset) const
01686 {
01687 id.source = dtn::data::EID((const char*)sqlite3_column_text(st, offset + 0));
01688 id.timestamp = sqlite3_column_int64(st, offset + 1);
01689 id.sequencenumber = sqlite3_column_int64(st, offset + 2);
01690
01691 id.fragment = (sqlite3_column_text(st, offset + 3) != NULL);
01692 id.offset = sqlite3_column_int64(st, offset + 3);
01693 }
01694 }
01695 }