|
IBR-DTNSuite 0.6
|
00001 /* 00002 * SQLiteBundleStorage.cpp 00003 * 00004 * Created on: 09.01.2010 00005 * Author: Myrtus 00006 */ 00007 00008 00009 #include "core/SQLiteBundleStorage.h" 00010 #include "core/TimeEvent.h" 00011 #include "core/GlobalEvent.h" 00012 #include "core/BundleCore.h" 00013 #include "core/SQLiteConfigure.h" 00014 #include "core/BundleEvent.h" 00015 #include "core/BundleExpiredEvent.h" 00016 00017 #include <ibrdtn/data/PayloadBlock.h> 00018 #include <ibrdtn/data/AgeBlock.h> 00019 #include <ibrdtn/data/Serializer.h> 00020 #include <ibrdtn/data/Bundle.h> 00021 #include <ibrdtn/data/BundleID.h> 00022 00023 #include <ibrcommon/thread/MutexLock.h> 00024 #include <ibrcommon/data/BLOB.h> 00025 #include <ibrcommon/AutoDelete.h> 00026 #include <ibrcommon/Logger.h> 00027 00028 namespace dtn 00029 { 00030 namespace core 00031 { 00032 void sql_tracer(void*, const char * pQuery) 00033 { 00034 IBRCOMMON_LOGGER_DEBUG(50) << "sqlite trace: " << pQuery << IBRCOMMON_LOGGER_ENDL; 00035 } 00036 00037 const std::string SQLiteBundleStorage::_tables[SQL_TABLE_END] = 00038 { "bundles", "blocks", "routing", "routing_bundles", "routing_nodes" }; 00039 00040 const std::string SQLiteBundleStorage::_sql_queries[SQL_QUERIES_END] = 00041 { 00042 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM " + _tables[SQL_TABLE_BUNDLE] + " ORDER BY priority DESC LIMIT ?,?;", 00043 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL LIMIT 1;", 00044 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? LIMIT 1;", 00045 "SELECT * FROM " + _tables[SQL_TABLE_BUNDLE] + " WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset != NULL ORDER BY fragmentoffset ASC;", 00046 00047 "SELECT source, timestamp, sequencenumber, fragmentoffset, procflags FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;", 00048 "SELECT filename FROM "+ _tables[SQL_TABLE_BUNDLE] +" as a, "+ _tables[SQL_TABLE_BLOCK] +" as b WHERE a.source_id = b.source_id AND a.timestamp = b.timestamp AND a.sequencenumber = b.sequencenumber AND a.fragmentoffset = b.fragmentoffset AND a.expiretime <= ?;", 00049 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;", 00050 "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY expiretime ASC LIMIT 1;", 00051 00052 "SELECT ROWID FROM "+ _tables[SQL_TABLE_BUNDLE] +" LIMIT 1;", 00053 "SELECT COUNT(ROWID) FROM "+ _tables[SQL_TABLE_BUNDLE] +";", 00054 00055 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;", 00056 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;", 00057 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +";", 00058 "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +" (source_id, timestamp, sequencenumber, fragmentoffset, source, destination, reportto, custodian, procflags, lifetime, appdatalength, expiretime, priority) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);", 00059 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;", 00060 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;", 00061 00062 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET procflags = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;", 00063 00064 "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL ORDER BY ordernumber ASC;", 00065 "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? ORDER BY ordernumber ASC;", 00066 "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL AND ordernumber = ?;", 00067 "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? AND ordernumber = ?;", 00068 "DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +";", 00069 "INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +" (source_id, timestamp, sequencenumber, fragmentoffset, blocktype, filename, ordernumber) VALUES (?,?,?,?,?,?,?);", 00070 00071 #ifdef SQLITE_STORAGE_EXTENDED 00072 "SELECT Routing FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;", 00073 "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;", 00074 "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +";", 00075 "INSERT INTO "+ _tables[SQL_TABLE_ROUTING] +" (Key, Routing) VALUES (?,?);", 00076 00077 "SELECT Routing FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;", 00078 "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;", 00079 "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +";", 00080 "INSERT INTO "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (EID, Key, Routing) VALUES (?,?,?);", 00081 00082 "SELECT Routing FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;", 00083 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;", 00084 "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID, Key, Routing) VALUES (?,?,?);", 00085 #endif 00086 00087 "VACUUM;" 00088 }; 00089 00090 // const std::string SQLiteBundleStorage::_db_structure[11] = 00091 // { 00092 // "create table if not exists "+ _tables[SQL_TABLE_BUNDLE] +" (Key INTEGER PRIMARY KEY ASC, BundleID text, Source text, Destination text, Reportto text, Custodian text, ProcFlags int, Timestamp int, Sequencenumber int, Lifetime int, Fragmentoffset int, Appdatalength int, TTL int, Size int, Payloadlength int, Payloadfilename text);", 00093 // "create table if not exists "+ _tables[SQL_TABLE_BLOCK] +" (Key INTEGER PRIMARY KEY ASC, BundleID text, BlockType int, Filename text, Blocknumber int);", 00094 // "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);", 00095 // "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);", 00096 // "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);", 00097 // "CREATE TRIGGER IF NOT EXISTS Blockdelete AFTER DELETE ON "+ _tables[SQL_TABLE_BUNDLE] +" FOR EACH ROW BEGIN DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE BundleID = OLD.BundleID; DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = OLD.BundleID; END;", 00098 // "CREATE TRIGGER IF NOT EXISTS BundleRoutingdelete BEFORE INSERT ON "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" BEGIN SELECT CASE WHEN (SELECT BundleID FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE BundleID = NEW.BundleID) IS NULL THEN RAISE(ABORT, \"Foreign Key Violation: BundleID doesn't exist in BundleTable\")END; END;", 00099 // "CREATE INDEX IF NOT EXISTS BlockIDIndex ON "+ _tables[SQL_TABLE_BLOCK] +" (BundleID);", 00100 // "CREATE UNIQUE INDEX IF NOT EXISTS BundleIDIndex ON "+ _tables[SQL_TABLE_BUNDLE] +" (BundleID);", 00101 // "CREATE UNIQUE INDEX IF NOT EXISTS RoutingKeyIndex ON "+ _tables[SQL_TABLE_ROUTING] +" (Key);", 00102 // "CREATE UNIQUE INDEX IF NOT EXISTS BundleRoutingIDIndex ON "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID,Key);", 00103 // }; 00104 00105 const std::string SQLiteBundleStorage::_db_structure[10] = 00106 { 00107 "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BLOCK] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `blocktype` INTEGER NOT NULL, `filename` TEXT NOT NULL, `ordernumber` INTEGER NOT NULL);", 00108 "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BUNDLE] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `source` TEXT NOT NULL, `destination` TEXT NOT NULL, `reportto` TEXT NOT NULL, `custodian` TEXT NOT NULL, `procflags` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `lifetime` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `appdatalength` INTEGER DEFAULT NULL, `expiretime` INTEGER NOT NULL, `priority` INTEGER NOT NULL);", 00109 "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);", 00110 "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);", 00111 "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);", 00112 "CREATE TRIGGER IF NOT EXISTS blocks_autodelete AFTER DELETE ON " + _tables[SQL_TABLE_BUNDLE] + " FOR EACH ROW BEGIN DELETE FROM " + _tables[SQL_TABLE_BLOCK] + " WHERE " + _tables[SQL_TABLE_BLOCK] + ".source_id = OLD.source_id AND " + _tables[SQL_TABLE_BLOCK] + ".timestamp = OLD.timestamp AND " + _tables[SQL_TABLE_BLOCK] + ".sequencenumber = OLD.sequencenumber AND ((" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset IS NULL AND old.fragmentoffset IS NULL) OR (" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset = old.fragmentoffset)); END;", 00113 "CREATE UNIQUE INDEX IF NOT EXISTS blocks_bid ON " + _tables[SQL_TABLE_BLOCK] + " (source_id, timestamp, sequencenumber, fragmentoffset);", 00114 "CREATE INDEX IF NOT EXISTS bundles_destination ON " + _tables[SQL_TABLE_BUNDLE] + " (destination);", 00115 "CREATE INDEX IF NOT EXISTS bundles_destination_priority ON " + _tables[SQL_TABLE_BUNDLE] + " (destination, priority);", 00116 "CREATE UNIQUE INDEX IF NOT EXISTS bundles_id ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset);" 00117 "CREATE INDEX IF NOT EXISTS bundles_expire ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset, expiretime);" 00118 }; 00119 00120 ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex; 00121 bool SQLiteBundleStorage::TaskIdle::_idle = false; 00122 00123 SQLiteBundleStorage::SQLBundleQuery::SQLBundleQuery() 00124 : _statement(NULL) 00125 { } 00126 00127 SQLiteBundleStorage::SQLBundleQuery::~SQLBundleQuery() 00128 { 00129 // free the statement if used before 00130 if (_statement != NULL) 00131 { 00132 sqlite3_finalize(_statement); 00133 } 00134 } 00135 00136 SQLiteBundleStorage::AutoResetLock::AutoResetLock(ibrcommon::Mutex &mutex, sqlite3_stmt *st) 00137 : _lock(mutex), _st(st) 00138 { 00139 //sqlite3_clear_bindings(st); 00140 } 00141 00142 SQLiteBundleStorage::AutoResetLock::~AutoResetLock() 00143 { 00144 sqlite3_reset(_st); 00145 } 00146 00147 SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const size_t &size) 00148 : dbPath(path), dbFile(path.get("sqlite.db")), dbSize(size), _next_expiration(0) 00149 { 00150 //Configure SQLite Library 00151 SQLiteConfigure::configure(); 00152 00153 // check if SQLite is thread-safe 00154 if (sqlite3_threadsafe() == 0) 00155 { 00156 IBRCOMMON_LOGGER(critical) << "sqlite library has not compiled with threading support." << IBRCOMMON_LOGGER_ENDL; 00157 throw ibrcommon::Exception("need threading support in sqlite!"); 00158 } 00159 } 00160 00161 SQLiteBundleStorage::~SQLiteBundleStorage() 00162 { 00163 stop(); 00164 join(); 00165 00166 ibrcommon::MutexLock l(*this); 00167 00168 // free all statements 00169 for (int i = 0; i < SQL_QUERIES_END; i++) 00170 { 00171 // prepare the statement 00172 sqlite3_finalize(_statements[i]); 00173 } 00174 00175 //close Databaseconnection 00176 if (sqlite3_close(_database) != SQLITE_OK) 00177 { 00178 IBRCOMMON_LOGGER(error) << "unable to close database" << IBRCOMMON_LOGGER_ENDL; 00179 } 00180 00181 // shutdown sqlite library 00182 SQLiteConfigure::shutdown(); 00183 } 00184 00185 void SQLiteBundleStorage::openDatabase(const ibrcommon::File &path) 00186 { 00187 ibrcommon::MutexLock l(*this); 00188 00189 // set the block path 00190 _blockPath = dbPath.get(_tables[SQL_TABLE_BLOCK]); 00191 00192 //open the database 00193 if (sqlite3_open_v2(path.getPath().c_str(), &_database, SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) 00194 { 00195 IBRCOMMON_LOGGER(error) << "Can't open database: " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL; 00196 sqlite3_close(_database); 00197 throw ibrcommon::Exception("Unable to open sqlite database"); 00198 } 00199 00200 int err = 0; 00201 00202 // create all tables 00203 for (size_t i = 0; i < 10; i++) 00204 { 00205 sqlite3_stmt *st = prepare(_db_structure[i]); 00206 err = sqlite3_step(st); 00207 if(err != SQLITE_DONE) 00208 { 00209 IBRCOMMON_LOGGER(error) << "SQLiteBundleStorage: failed to create database" << err << IBRCOMMON_LOGGER_ENDL; 00210 } 00211 sqlite3_reset(st); 00212 sqlite3_finalize(st); 00213 } 00214 00215 // create the bundle folder 00216 ibrcommon::File::createDirectory( _blockPath ); 00217 00218 // prepare all statements 00219 for (int i = 0; i < SQL_QUERIES_END; i++) 00220 { 00221 // prepare the statement 00222 int err = sqlite3_prepare_v2(_database, _sql_queries[i].c_str(), _sql_queries[i].length(), &_statements[i], 0); 00223 if ( err != SQLITE_OK ) 00224 { 00225 IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failed to prepare statement: " << err << " with query: " << _sql_queries[i] << IBRCOMMON_LOGGER_ENDL; 00226 } 00227 } 00228 00229 // disable synchronous mode 00230 sqlite3_exec(_database, "PRAGMA synchronous = OFF;", NULL, NULL, NULL); 00231 00232 // enable sqlite tracing if debug level is higher than 50 00233 if (IBRCOMMON_LOGGER_LEVEL >= 50) 00234 { 00235 sqlite3_trace(_database, &sql_tracer, NULL); 00236 } 00237 } 00238 00239 void SQLiteBundleStorage::componentRun() 00240 { 00241 // loop until aborted 00242 try { 00243 while (true) 00244 { 00245 Task *t = _tasks.getnpop(true); 00246 00247 try { 00248 BlockingTask &btask = dynamic_cast<BlockingTask&>(*t); 00249 try { 00250 btask.run(*this); 00251 } catch (const std::exception&) { 00252 btask.abort(); 00253 continue; 00254 }; 00255 btask.done(); 00256 continue; 00257 } catch (const std::bad_cast&) { }; 00258 00259 try { 00260 ibrcommon::AutoDelete<Task> killer(t); 00261 t->run(*this); 00262 } catch (const std::exception&) { }; 00263 } 00264 } catch (const ibrcommon::QueueUnblockedException &ex) { 00265 // we are aborted, abort all blocking tasks 00266 } 00267 } 00268 00269 void SQLiteBundleStorage::componentUp() 00270 { 00271 //register Events 00272 bindEvent(TimeEvent::className); 00273 bindEvent(GlobalEvent::className); 00274 00275 // open the database and create all folders and files if needed 00276 openDatabase(dbFile); 00277 00278 //Check if the database is consistent with the filesystem 00279 check_consistency(); 00280 00281 // calculate next Bundleexpiredtime 00282 update_expire_time(); 00283 }; 00284 00285 void SQLiteBundleStorage::componentDown() 00286 { 00287 //unregister Events 00288 unbindEvent(TimeEvent::className); 00289 unbindEvent(GlobalEvent::className); 00290 }; 00291 00292 bool SQLiteBundleStorage::__cancellation() 00293 { 00294 _tasks.abort(); 00295 return true; 00296 } 00297 00298 void SQLiteBundleStorage::check_consistency() 00299 { 00300 /* 00301 * check if the database is consistent with the files on the HDD. Therefore every file of the database is put in a list. 00302 * When the files of the database are scanned it is checked if the actual file is in the list of files of the filesystem. if it is so the entry 00303 * of the list of files of the filesystem is deleted. if not the file is put in a seperate list. After this procedure there are 2 lists containing file 00304 * which are inconsistent and can be deleted. 00305 */ 00306 int err; 00307 size_t timestamp, sequencenumber, offset, pFlags, appdata, lifetime; 00308 set<string> blockFiles,payloadfiles; 00309 string datei; 00310 00311 list<ibrcommon::File> blist; 00312 list<ibrcommon::File>::iterator file_it; 00313 00314 //1. Durchsuche die Dateien 00315 _blockPath.getFiles(blist); 00316 for(file_it = blist.begin(); file_it != blist.end(); file_it++) 00317 { 00318 datei = (*file_it).getPath(); 00319 if(datei[datei.size()-1] != '.') 00320 { 00321 if(((*file_it).getBasename())[0] == 'b') 00322 { 00323 blockFiles.insert(datei); 00324 } 00325 else 00326 payloadfiles.insert(datei); 00327 } 00328 } 00329 00330 //3. Check consistency of the bundles 00331 check_bundles(blockFiles); 00332 00333 //4. Check consistency of the fragments 00334 check_fragments(payloadfiles); 00335 } 00336 00337 void SQLiteBundleStorage::check_fragments(std::set<std::string> &payloadFiles) 00338 { 00339 int err; 00340 size_t procFlags, timestamp, sequencenumber, appdatalength, lifetime; 00341 string filename, source, dest, custody, repto; 00342 set<string> consistenFiles, inconsistenSources; 00343 set<size_t> inconsistentTimestamp, inconsistentSeq_number; 00344 set<string>::iterator file_it, consisten_it; 00345 00346 sqlite3_stmt *getPayloadfiles = prepare("SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+_tables[SQL_TABLE_BUNDLE]+" WHERE fragmentoffset != NULL;"); 00347 00348 for(err = sqlite3_step(getPayloadfiles); err == SQLITE_ROW; err = sqlite3_step(getPayloadfiles)) 00349 { 00350 filename = (const char*)sqlite3_column_text(getPayloadfiles,10); 00351 file_it = payloadFiles.find(filename); 00352 00353 00354 if (file_it == payloadFiles.end()) 00355 { 00356 consisten_it = consistenFiles.find(*file_it); 00357 00358 //inconsistent DB entry 00359 if(consisten_it == consistenFiles.end()){ 00360 //Generate Report 00361 source = (const char*) sqlite3_column_text(getPayloadfiles,0); 00362 dest = (const char*) sqlite3_column_text(getPayloadfiles,1); 00363 repto = (const char*) sqlite3_column_text(getPayloadfiles,2); 00364 custody = (const char*) sqlite3_column_text(getPayloadfiles,3); 00365 procFlags = sqlite3_column_int64(getPayloadfiles,4); 00366 timestamp = sqlite3_column_int64(getPayloadfiles,5); 00367 sequencenumber = sqlite3_column_int64(getPayloadfiles,6); 00368 lifetime = sqlite3_column_int64(getPayloadfiles,7); 00369 appdatalength = sqlite3_column_int64(getPayloadfiles,9); 00370 00371 dtn::data::BundleID id(dtn::data::EID(source),timestamp,sequencenumber); 00372 dtn::data::MetaBundle mb(id, lifetime, dtn::data::DTNTime(), dtn::data::EID(dest), dtn::data::EID(repto), dtn::data::EID(custody), appdatalength, procFlags); 00373 dtn::core::BundleEvent::raise(mb, BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE); 00374 } 00375 } 00376 //consistent DB entry 00377 else 00378 consistenFiles.insert(*file_it); 00379 payloadFiles.erase(file_it); 00380 } 00381 00382 sqlite3_reset(getPayloadfiles); 00383 sqlite3_finalize(getPayloadfiles); 00384 } 00385 00386 void SQLiteBundleStorage::check_bundles(std::set<std::string> &blockFiles) 00387 { 00388 std::set<dtn::data::BundleID> corrupt_bundle_ids; 00389 00390 sqlite3_stmt *blockConistencyCheck = prepare("SELECT source_id, timestamp, sequencenumber, fragmentoffset, filename, ordernumber FROM "+ _tables[SQL_TABLE_BLOCK] +";"); 00391 00392 while (sqlite3_step(blockConistencyCheck) == SQLITE_ROW) 00393 { 00394 dtn::data::BundleID id; 00395 00396 // get the bundleid 00397 get_bundleid(blockConistencyCheck, id); 00398 00399 // get the filename 00400 std::string filename = (const char*)sqlite3_column_text(blockConistencyCheck, 4); 00401 00402 // if the filename is not in the list of block files 00403 if (blockFiles.find(filename) == blockFiles.end()) 00404 { 00405 // add the bundle to the deletion list 00406 corrupt_bundle_ids.insert(id); 00407 } 00408 else 00409 { 00410 // file is available, everything is fine 00411 blockFiles.erase(filename); 00412 } 00413 } 00414 sqlite3_reset(blockConistencyCheck); 00415 sqlite3_finalize(blockConistencyCheck); 00416 00417 for(std::set<dtn::data::BundleID>::const_iterator iter = corrupt_bundle_ids.begin(); iter != corrupt_bundle_ids.end(); iter++) 00418 { 00419 try { 00420 const dtn::data::BundleID &id = (*iter); 00421 00422 // get meta data of this bundle 00423 const dtn::data::MetaBundle m = get_meta(id); 00424 00425 // remove the hole bundle 00426 remove(id); 00427 } catch (const dtn::core::SQLiteBundleStorage::SQLiteQueryException&) { }; 00428 } 00429 00430 // delete block files still in the blockfile list 00431 for (std::set<std::string>::const_iterator iter = blockFiles.begin(); iter != blockFiles.end(); iter++) 00432 { 00433 ibrcommon::File blockfile(*iter); 00434 blockfile.remove(); 00435 } 00436 } 00437 00438 00439 sqlite3_stmt* SQLiteBundleStorage::prepare(const std::string &sqlquery) 00440 { 00441 // prepare the statement 00442 sqlite3_stmt *statement; 00443 int err = sqlite3_prepare_v2(_database, sqlquery.c_str(), sqlquery.length(), &statement, 0); 00444 if ( err != SQLITE_OK ) 00445 { 00446 IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failure in prepareStatement: " << err << " with Query: " << sqlquery << IBRCOMMON_LOGGER_ENDL; 00447 } 00448 return statement; 00449 } 00450 00451 dtn::data::MetaBundle SQLiteBundleStorage::get_meta(const dtn::data::BundleID &id) 00452 { 00453 dtn::data::MetaBundle bundle; 00454 00455 // check if the bundle is already on the deletion list 00456 if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException(); 00457 00458 size_t stmt_key = BUNDLE_GET_ID; 00459 if (id.fragment) stmt_key = FRAGMENT_GET_ID; 00460 00461 // lock the prepared statement 00462 AutoResetLock l(_locks[stmt_key], _statements[stmt_key]); 00463 00464 // bind bundle id to the statement 00465 set_bundleid(_statements[stmt_key], id); 00466 00467 // execute the query and check for error 00468 if (sqlite3_step(_statements[stmt_key]) != SQLITE_ROW) 00469 { 00470 stringstream error; 00471 error << "SQLiteBundleStorage: No Bundle found with BundleID: " << id.toString(); 00472 IBRCOMMON_LOGGER_DEBUG(15) << error.str() << IBRCOMMON_LOGGER_ENDL; 00473 throw SQLiteQueryException(error.str()); 00474 } 00475 00476 // query bundle data 00477 get(_statements[stmt_key], bundle); 00478 00479 return bundle; 00480 } 00481 00482 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::MetaBundle &bundle, size_t offset) 00483 { 00484 bundle.source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) ); 00485 bundle.destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) ); 00486 bundle.reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) ); 00487 bundle.custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) ); 00488 bundle.procflags = sqlite3_column_int(st, offset + 4); 00489 bundle.timestamp = sqlite3_column_int64(st, offset + 5); 00490 bundle.sequencenumber = sqlite3_column_int64(st, offset + 6); 00491 bundle.lifetime = sqlite3_column_int64(st, offset + 7); 00492 00493 if (bundle.procflags & data::Bundle::FRAGMENT) 00494 { 00495 bundle.offset = sqlite3_column_int64(st, offset + 8); 00496 bundle.appdatalength = sqlite3_column_int64(st, offset + 9); 00497 } 00498 } 00499 00500 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::Bundle &bundle, size_t offset) 00501 { 00502 bundle._source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) ); 00503 bundle._destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) ); 00504 bundle._reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) ); 00505 bundle._custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) ); 00506 bundle._procflags = sqlite3_column_int(st, offset + 4); 00507 bundle._timestamp = sqlite3_column_int64(st, offset + 5); 00508 bundle._sequencenumber = sqlite3_column_int64(st, offset + 6); 00509 bundle._lifetime = sqlite3_column_int64(st, offset + 7); 00510 00511 if (bundle._procflags & data::Bundle::FRAGMENT) 00512 { 00513 bundle._fragmentoffset = sqlite3_column_int64(st, offset + 8); 00514 bundle._appdatalength = sqlite3_column_int64(st, offset + 9); 00515 } 00516 } 00517 00518 const std::list<dtn::data::MetaBundle> SQLiteBundleStorage::get(BundleFilterCallback &cb) 00519 { 00520 std::list<dtn::data::MetaBundle> ret; 00521 00522 const std::string base_query = 00523 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM " + _tables[SQL_TABLE_BUNDLE]; 00524 00525 sqlite3_stmt *st = NULL; 00526 size_t bind_offset = 1; 00527 00528 try { 00529 SQLBundleQuery &query = dynamic_cast<SQLBundleQuery&>(cb); 00530 00531 if (query._statement == NULL) 00532 { 00533 std::string sql = base_query + " WHERE " + query.getWhere() + " ORDER BY priority DESC"; 00534 00535 if (cb.limit() > 0) 00536 { 00537 sql += " LIMIT ?,?"; 00538 } 00539 00540 // add closing delimiter 00541 sql += ";"; 00542 00543 // prepare the hole statement 00544 query._statement = prepare(sql); 00545 } 00546 00547 st = query._statement; 00548 bind_offset = query.bind(st, 1); 00549 } catch (const std::bad_cast&) { 00550 // this query is not optimized for sql, use the generic way (slow) 00551 st = _statements[BUNDLE_GET_FILTER]; 00552 }; 00553 00554 size_t offset = 0; 00555 while (true) 00556 { 00557 // lock the database 00558 AutoResetLock l(_locks[BUNDLE_GET_FILTER], st); 00559 00560 if (cb.limit() > 0) 00561 { 00562 sqlite3_bind_int64(st, bind_offset, offset); 00563 sqlite3_bind_int64(st, bind_offset + 1, cb.limit()); 00564 } 00565 00566 if (sqlite3_step(st) == SQLITE_DONE) 00567 { 00568 // returned no result 00569 break; 00570 } 00571 00572 while (true) 00573 { 00574 dtn::data::MetaBundle m; 00575 00576 // extract the primary values and set them in the bundle object 00577 get(st, m, 0); 00578 00579 // check if the bundle is already on the deletion list 00580 if (!contains_deletion(m)) 00581 { 00582 // ask the filter if this bundle should be added to the return list 00583 if (cb.shouldAdd(m)) 00584 { 00585 IBRCOMMON_LOGGER_DEBUG(40) << "add bundle to query selection list: " << m.toString() << IBRCOMMON_LOGGER_ENDL; 00586 00587 // add the bundle to the list 00588 ret.push_back(m); 00589 } 00590 } 00591 00592 if (sqlite3_step(st) != SQLITE_ROW) 00593 { 00594 break; 00595 } 00596 00597 // abort if enough bundles are found 00598 if ((cb.limit() > 0) && (ret.size() >= cb.limit())) break; 00599 } 00600 00601 // if a limit is set 00602 if (cb.limit() > 0) 00603 { 00604 // increment the offset, because we might not have enough 00605 offset += cb.limit(); 00606 00607 // abort if enough bundles are found 00608 if (ret.size() >= cb.limit()) break; 00609 } 00610 else 00611 { 00612 break; 00613 } 00614 } 00615 00616 return ret; 00617 } 00618 00619 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id) 00620 { 00621 dtn::data::Bundle bundle; 00622 int err = 0; 00623 00624 IBRCOMMON_LOGGER_DEBUG(25) << "get bundle from sqlite storage " << id.toString() << IBRCOMMON_LOGGER_ENDL; 00625 00626 // if bundle is deleted? 00627 if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException(); 00628 00629 size_t stmt_key = BUNDLE_GET_ID; 00630 if (id.fragment) stmt_key = FRAGMENT_GET_ID; 00631 00632 // do this while db is locked 00633 AutoResetLock l(_locks[stmt_key], _statements[stmt_key]); 00634 00635 // set the bundle key values 00636 set_bundleid(_statements[stmt_key], id); 00637 00638 // execute the query and check for error 00639 if ((err = sqlite3_step(_statements[stmt_key])) != SQLITE_ROW) 00640 { 00641 IBRCOMMON_LOGGER_DEBUG(15) << "sql error: " << err << "; No bundle found with id: " << id.toString() << IBRCOMMON_LOGGER_ENDL; 00642 throw dtn::core::BundleStorage::NoBundleFoundException(); 00643 } 00644 00645 // read bundle data 00646 get(_statements[stmt_key], bundle); 00647 00648 try { 00649 // read all blocks 00650 get_blocks(bundle, id); 00651 } catch (const ibrcommon::Exception &ex) { 00652 IBRCOMMON_LOGGER(error) << "could not get bundle blocks: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00653 throw dtn::core::BundleStorage::NoBundleFoundException(); 00654 } 00655 00656 return bundle; 00657 } 00658 00659 #ifdef SQLITE_STORAGE_EXTENDED 00660 void SQLiteBundleStorage::setPriority(const int priority, const dtn::data::BundleID &id) 00661 { 00662 int err; 00663 size_t procflags; 00664 { 00665 sqlite3_bind_text(_statements[PROCFLAGS_GET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT); 00666 err = sqlite3_step(_statements[PROCFLAGS_GET]); 00667 if(err == SQLITE_ROW) 00668 procflags = sqlite3_column_int(_statements[PROCFLAGS_GET],0); 00669 sqlite3_reset(_statements[PROCFLAGS_GET]); 00670 if(err != SQLITE_DONE){ 00671 stringstream error; 00672 error << "SQLiteBundleStorage: error while Select Querry: " << err; 00673 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00674 throw SQLiteQueryException(error.str()); 00675 } 00676 00677 //Priority auf 0 Setzen 00678 procflags -= ( 128*(procflags & dtn::data::Bundle::PRIORITY_BIT1) + 256*(procflags & dtn::data::Bundle::PRIORITY_BIT2)); 00679 00680 //Set Prioritybits 00681 switch(priority){ 00682 case 1: procflags += data::Bundle::PRIORITY_BIT1; break; //Set PRIORITY_BIT1 00683 case 2: procflags += data::Bundle::PRIORITY_BIT2; break; 00684 case 3: procflags += (data::Bundle::PRIORITY_BIT1 + data::Bundle::PRIORITY_BIT2); break; 00685 } 00686 00687 sqlite3_bind_text(_statements[PROCFLAGS_SET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT); 00688 sqlite3_bind_int64(_statements[PROCFLAGS_SET],2,priority); 00689 sqlite3_step(_statements[PROCFLAGS_SET]); 00690 sqlite3_reset(_statements[PROCFLAGS_SET]); 00691 00692 if(err != SQLITE_DONE) 00693 { 00694 stringstream error; 00695 error << "SQLiteBundleStorage: setPriority error while Select Querry: " << err; 00696 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00697 throw SQLiteQueryException(error.str()); 00698 } 00699 } 00700 } 00701 #endif 00702 00703 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle) 00704 { 00705 IBRCOMMON_LOGGER_DEBUG(25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL; 00706 00707 // determine if the bundle is for local delivery 00708 bool local = (bundle._destination.sameHost(BundleCore::local) 00709 && (bundle._procflags & dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)); 00710 00711 // // if the bundle is a fragment store it as one 00712 // if (bundle.get(dtn::data::Bundle::FRAGMENT) && local) 00713 // { 00714 // storeFragment(bundle); 00715 // return; 00716 // } 00717 00718 int err; 00719 00720 const dtn::data::EID _sourceid = bundle._source; 00721 size_t TTL = bundle._timestamp + bundle._lifetime; 00722 00723 AutoResetLock l(_locks[BUNDLE_STORE], _statements[BUNDLE_STORE]); 00724 00725 set_bundleid(_statements[BUNDLE_STORE], bundle); 00726 00727 sqlite3_bind_text(_statements[BUNDLE_STORE], 5, bundle._source.getString().c_str(), bundle._source.getString().length(), SQLITE_TRANSIENT); 00728 sqlite3_bind_text(_statements[BUNDLE_STORE], 6, bundle._destination.getString().c_str(), bundle._destination.getString().length(), SQLITE_TRANSIENT); 00729 sqlite3_bind_text(_statements[BUNDLE_STORE], 7, bundle._reportto.getString().c_str(), bundle._reportto.getString().length(), SQLITE_TRANSIENT); 00730 sqlite3_bind_text(_statements[BUNDLE_STORE], 8, bundle._custodian.getString().c_str(), bundle._custodian.getString().length(), SQLITE_TRANSIENT); 00731 sqlite3_bind_int(_statements[BUNDLE_STORE], 9, bundle._procflags); 00732 sqlite3_bind_int64(_statements[BUNDLE_STORE], 10, bundle._lifetime); 00733 00734 if (bundle.get(dtn::data::Bundle::FRAGMENT)) 00735 { 00736 sqlite3_bind_int64(_statements[BUNDLE_STORE], 11, bundle._appdatalength); 00737 } 00738 else 00739 { 00740 sqlite3_bind_null(_statements[BUNDLE_STORE], 4); 00741 sqlite3_bind_null(_statements[BUNDLE_STORE], 11); 00742 } 00743 00744 sqlite3_bind_int64(_statements[BUNDLE_STORE], 12, TTL); 00745 sqlite3_bind_int64(_statements[BUNDLE_STORE], 13, dtn::data::MetaBundle(bundle).getPriority()); 00746 00747 // start a transaction 00748 sqlite3_exec(_database, "BEGIN TRANSACTION;", NULL, NULL, NULL); 00749 00750 // store the bundle data in the database 00751 err = sqlite3_step(_statements[BUNDLE_STORE]); 00752 00753 if (err == SQLITE_CONSTRAINT) 00754 { 00755 IBRCOMMON_LOGGER(warning) << "Bundle is already in the storage" << IBRCOMMON_LOGGER_ENDL; 00756 } 00757 else if (err != SQLITE_DONE) 00758 { 00759 stringstream error; 00760 error << "SQLiteBundleStorage: store() failure: " << err << " " << sqlite3_errmsg(_database); 00761 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00762 00763 // rollback the whole transaction 00764 sqlite3_exec(_database, "ROLLBACK TRANSACTION;", NULL, NULL, NULL); 00765 00766 throw SQLiteQueryException(error.str()); 00767 } 00768 else 00769 { 00770 // stores the blocks to a file 00771 store_blocks(bundle); 00772 00773 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL; 00774 } 00775 00776 // commit the transaction 00777 sqlite3_exec(_database, "END TRANSACTION;", NULL, NULL, NULL); 00778 00779 // set new expire time 00780 new_expire_time(TTL); 00781 00782 try { 00783 // the bundle is stored sucessfully, we could accept custody if it is requested 00784 const dtn::data::EID custodian = acceptCustody(bundle); 00785 00786 // update the custody address of this bundle 00787 update_custodian(bundle, custodian); 00788 } catch (const ibrcommon::Exception&) { 00789 // this bundle has no request for custody transfers 00790 } 00791 } 00792 00793 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id) 00794 { 00795 add_deletion(id); 00796 _tasks.push(new TaskRemove(id)); 00797 } 00798 00799 void SQLiteBundleStorage::TaskRemove::run(SQLiteBundleStorage &storage) 00800 { 00801 { 00802 // select the right statement to use 00803 const size_t stmt_key = _id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID; 00804 00805 // lock the database 00806 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]); 00807 00808 // set the bundle key values 00809 storage.set_bundleid(storage._statements[stmt_key], _id); 00810 00811 // step through all blocks 00812 while (sqlite3_step(storage._statements[stmt_key]) == SQLITE_ROW) 00813 { 00814 // delete each referenced block file 00815 ibrcommon::File blockfile( (const char*)sqlite3_column_text(storage._statements[stmt_key], 0) ); 00816 blockfile.remove(); 00817 } 00818 } 00819 00820 { 00821 const size_t stmt_key = _id.fragment ? FRAGMENT_DELETE : BUNDLE_DELETE; 00822 00823 // lock the database 00824 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]); 00825 00826 // then remove the bundle data 00827 storage.set_bundleid(storage._statements[stmt_key], _id); 00828 sqlite3_step(storage._statements[stmt_key]); 00829 00830 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << _id.toString() << " deleted" << IBRCOMMON_LOGGER_ENDL; 00831 } 00832 00833 //update deprecated timer 00834 storage.update_expire_time(); 00835 00836 // remove it from the deletion list 00837 storage.remove_deletion(_id); 00838 } 00839 00840 #ifdef SQLITE_STORAGE_EXTENDED 00841 std::string SQLiteBundleStorage::getBundleRoutingInfo(const data::BundleID &bundleID, const int &key) 00842 { 00843 int err; 00844 string result; 00845 00846 sqlite3_bind_text(_statements[ROUTING_GET],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT); 00847 sqlite3_bind_int(_statements[ROUTING_GET],2,key); 00848 err = sqlite3_step(_statements[ROUTING_GET]); 00849 if(err == SQLITE_ROW){ 00850 result = (const char*) sqlite3_column_text(_statements[ROUTING_GET],0); 00851 } 00852 sqlite3_reset(_statements[ROUTING_GET]); 00853 if(err != SQLITE_DONE && err != SQLITE_ROW){ 00854 stringstream error; 00855 error << "SQLiteBundleStorage: getBundleRoutingInfo: " << err << " errmsg: " << err; 00856 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00857 throw SQLiteQueryException(error.str()); 00858 } 00859 00860 return result; 00861 } 00862 00863 std::string SQLiteBundleStorage::getNodeRoutingInfo(const data::EID &eid, const int &key) 00864 { 00865 int err; 00866 string result; 00867 00868 err = sqlite3_bind_text(_statements[NODE_GET],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT); 00869 err = sqlite3_bind_int(_statements[NODE_GET],2,key); 00870 err = sqlite3_step(_statements[NODE_GET]); 00871 if(err == SQLITE_ROW){ 00872 result = (const char*) sqlite3_column_text(_statements[NODE_GET],0); 00873 } 00874 sqlite3_reset(_statements[NODE_GET]); 00875 if(err != SQLITE_DONE && err != SQLITE_ROW){ 00876 stringstream error; 00877 error << "SQLiteBundleStorage: getNodeRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database); 00878 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00879 throw SQLiteQueryException(error.str()); 00880 } 00881 00882 return result; 00883 } 00884 00885 std::string SQLiteBundleStorage::getRoutingInfo(const int &key) 00886 { 00887 int err; 00888 string result; 00889 00890 sqlite3_bind_int(_statements[INFO_GET],1,key); 00891 err = sqlite3_step(_statements[INFO_GET]); 00892 if(err == SQLITE_ROW){ 00893 result = (const char*) sqlite3_column_text(_statements[INFO_GET],0); 00894 } 00895 sqlite3_reset(_statements[INFO_GET]); 00896 if(err != SQLITE_DONE && err != SQLITE_ROW){ 00897 stringstream error; 00898 error << "SQLiteBundleStorage: getRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database); 00899 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00900 throw SQLiteQueryException(error.str()); 00901 } 00902 00903 return result; 00904 } 00905 00906 void SQLiteBundleStorage::storeBundleRoutingInfo(const data::BundleID &bundleID, const int &key, const string &routingInfo) 00907 { 00908 int err; 00909 00910 sqlite3_bind_text(_statements[ROUTING_STORE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT); 00911 sqlite3_bind_int(_statements[ROUTING_STORE],2,key); 00912 sqlite3_bind_text(_statements[ROUTING_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT); 00913 err = sqlite3_step(_statements[ROUTING_STORE]); 00914 sqlite3_reset(_statements[ROUTING_STORE]); 00915 if(err == SQLITE_CONSTRAINT) 00916 cerr << "warning: BundleRoutingInformations for "<<bundleID.toString() <<" are either already in the storage or there is no Bundle with this BundleID."<<endl; 00917 else if(err != SQLITE_DONE){ 00918 stringstream error; 00919 error << "SQLiteBundleStorage: storeBundleRoutingInfo: " << err << " errmsg: " << err; 00920 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00921 throw SQLiteQueryException(error.str()); 00922 } 00923 } 00924 00925 void SQLiteBundleStorage::storeNodeRoutingInfo(const data::EID &node, const int &key, const std::string &routingInfo) 00926 { 00927 int err; 00928 00929 sqlite3_bind_text(_statements[NODE_STORE],1,node.getString().c_str(),node.getString().length(), SQLITE_TRANSIENT); 00930 sqlite3_bind_int(_statements[NODE_STORE],2,key); 00931 sqlite3_bind_text(_statements[NODE_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT); 00932 err = sqlite3_step(_statements[NODE_STORE]); 00933 sqlite3_reset(_statements[NODE_STORE]); 00934 if(err == SQLITE_CONSTRAINT) 00935 cerr << "warning: NodeRoutingInfo for "<<node.getString() <<" are already in the storage."<<endl; 00936 else if(err != SQLITE_DONE){ 00937 stringstream error; 00938 error << "SQLiteBundleStorage: storeNodeRoutingInfo: " << err << " errmsg: " << err; 00939 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00940 throw SQLiteQueryException(error.str()); 00941 } 00942 } 00943 00944 void SQLiteBundleStorage::storeRoutingInfo(const int &key, const std::string &routingInfo) 00945 { 00946 int err; 00947 00948 sqlite3_bind_int(_statements[INFO_STORE],1,key); 00949 sqlite3_bind_text(_statements[INFO_STORE],2,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT); 00950 err = sqlite3_step(_statements[INFO_STORE]); 00951 sqlite3_reset(_statements[INFO_STORE]); 00952 if(err == SQLITE_CONSTRAINT) 00953 cerr << "warning: There are already RoutingInformation refereed by this Key"<<endl; 00954 else if(err != SQLITE_DONE){ 00955 stringstream error; 00956 error << "SQLiteBundleStorage: storeRoutingInfo: " << err << " errmsg: " << err; 00957 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00958 throw SQLiteQueryException(error.str()); 00959 } 00960 } 00961 00962 void SQLiteBundleStorage::removeBundleRoutingInfo(const data::BundleID &bundleID, const int &key) 00963 { 00964 int err; 00965 00966 sqlite3_bind_text(_statements[ROUTING_REMOVE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT); 00967 sqlite3_bind_int(_statements[ROUTING_REMOVE],2,key); 00968 err = sqlite3_step(_statements[ROUTING_REMOVE]); 00969 sqlite3_reset(_statements[ROUTING_REMOVE]); 00970 if(err != SQLITE_DONE){ 00971 stringstream error; 00972 error << "SQLiteBundleStorage: removeBundleRoutingInfo: " << err << " errmsg: " << err; 00973 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00974 throw SQLiteQueryException(error.str()); 00975 } 00976 } 00977 00978 void SQLiteBundleStorage::removeNodeRoutingInfo(const data::EID &eid, const int &key) 00979 { 00980 int err; 00981 00982 sqlite3_bind_text(_statements[NODE_REMOVE],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT); 00983 sqlite3_bind_int(_statements[NODE_REMOVE],2,key); 00984 err = sqlite3_step(_statements[NODE_REMOVE]); 00985 sqlite3_reset(_statements[NODE_REMOVE]); 00986 if(err != SQLITE_DONE){ 00987 stringstream error; 00988 error << "SQLiteBundleStorage: removeNodeRoutingInfo: " << err << " errmsg: " << err; 00989 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00990 throw SQLiteQueryException(error.str()); 00991 } 00992 } 00993 00994 void SQLiteBundleStorage::removeRoutingInfo(const int &key) 00995 { 00996 int err; 00997 00998 sqlite3_bind_int(_statements[INFO_REMOVE],1,key); 00999 err = sqlite3_step(_statements[INFO_REMOVE]); 01000 sqlite3_reset(_statements[INFO_REMOVE]); 01001 if(err != SQLITE_DONE){ 01002 stringstream error; 01003 error << "SQLiteBundleStorage: removeRoutingInfo: " << err << " errmsg: " << err; 01004 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01005 throw SQLiteQueryException(error.str()); 01006 } 01007 } 01008 #endif 01009 01010 void SQLiteBundleStorage::clearAll() 01011 { 01012 #ifdef SQLITE_STORAGE_EXTENDED 01013 sqlite3_step(_statements[ROUTING_CLEAR]); 01014 sqlite3_reset(_statements[ROUTING_CLEAR]); 01015 sqlite3_step(_statements[NODE_CLEAR]); 01016 sqlite3_reset(_statements[NODE_CLEAR]); 01017 #endif 01018 01019 clear(); 01020 } 01021 01022 void SQLiteBundleStorage::clear() 01023 { 01024 AutoResetLock l(_locks[VACUUM], _statements[VACUUM]); 01025 01026 sqlite3_step(_statements[BUNDLE_CLEAR]); 01027 sqlite3_reset(_statements[BUNDLE_CLEAR]); 01028 sqlite3_step(_statements[BLOCK_CLEAR]); 01029 sqlite3_reset(_statements[BLOCK_CLEAR]); 01030 01031 if(SQLITE_DONE != sqlite3_step(_statements[VACUUM])) 01032 { 01033 sqlite3_reset(_statements[VACUUM]); 01034 throw SQLiteQueryException("SQLiteBundleStore: clear(): vacuum failed."); 01035 } 01036 01037 //Delete Folder SQL_TABLE_BLOCK containing Blocks 01038 { 01039 _blockPath.remove(true); 01040 ibrcommon::File::createDirectory(_blockPath); 01041 } 01042 01043 reset_expire_time(); 01044 } 01045 01046 01047 01048 bool SQLiteBundleStorage::empty() 01049 { 01050 AutoResetLock l(_locks[EMPTY_CHECK], _statements[EMPTY_CHECK]); 01051 01052 if (SQLITE_DONE == sqlite3_step(_statements[EMPTY_CHECK])) 01053 { 01054 return true; 01055 } 01056 else 01057 { 01058 return false; 01059 } 01060 } 01061 01062 unsigned int SQLiteBundleStorage::count() 01063 { 01064 int rows = 0; 01065 int err = 0; 01066 01067 AutoResetLock l(_locks[COUNT_ENTRIES], _statements[COUNT_ENTRIES]); 01068 01069 if ((err = sqlite3_step(_statements[COUNT_ENTRIES])) == SQLITE_ROW) 01070 { 01071 rows = sqlite3_column_int(_statements[COUNT_ENTRIES], 0); 01072 } 01073 else 01074 { 01075 stringstream error; 01076 error << "SQLiteBundleStorage: count: failure " << err << " " << sqlite3_errmsg(_database); 01077 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01078 throw SQLiteQueryException(error.str()); 01079 } 01080 01081 return rows; 01082 } 01083 01084 #ifdef SQLITE_STORAGE_EXTENDED 01085 int SQLiteBundleStorage::occupiedSpace() 01086 { 01087 int size = 0; 01088 std::list<ibrcommon::File> files; 01089 01090 if (_blockPath.getFiles(files)) 01091 { 01092 IBRCOMMON_LOGGER(error) << "occupiedSpace: unable to open Directory " << _blockPath.getPath() << IBRCOMMON_LOGGER_ENDL; 01093 return -1; 01094 } 01095 01096 for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++) 01097 { 01098 const ibrcommon::File &f = (*iter); 01099 01100 if (!f.isSystem()) 01101 { 01102 size += f.size(); 01103 } 01104 } 01105 01106 // add db file size 01107 size += dbFile.size(); 01108 01109 return size; 01110 } 01111 01112 void SQLiteBundleStorage::storeFragment(const dtn::data::Bundle &bundle) 01113 { 01114 int err = 0; 01115 size_t bitcounter = 0; 01116 bool allFragmentsReceived(true); 01117 size_t payloadsize, TTL, fragmentoffset; 01118 01119 ibrcommon::File payloadfile, filename; 01120 01121 // generate a bundle id string 01122 std::string bundleID = dtn::data::BundleID(bundle).toString(); 01123 01124 // get the destination id string 01125 std::string destination = bundle._destination.getString(); 01126 01127 // get the source id string 01128 std::string sourceEID = bundle._source.getString(); 01129 01130 // calculate the expiration timestamp 01131 TTL = bundle._timestamp + bundle._lifetime; 01132 01133 const dtn::data::PayloadBlock &block = bundle.getBlock<dtn::data::PayloadBlock>(); 01134 ibrcommon::BLOB::Reference payloadBlob = block.getBLOB(); 01135 01136 // get the payload size 01137 { 01138 ibrcommon::BLOB::iostream io = payloadBlob.iostream(); 01139 payloadsize = io.size(); 01140 } 01141 01142 //Saves the blocks from the first and the last Fragment, they may contain Extentionblock 01143 bool last = bundle._fragmentoffset + payloadsize == bundle._appdatalength; 01144 bool first = bundle._fragmentoffset == 0; 01145 01146 if(first || last) 01147 { 01148 int i = 0, blocknumber = 0; 01149 01150 // get the list of blocks 01151 std::list<const dtn::data::Block*> blocklist = bundle.getBlocks(); 01152 01153 // iterate through all blocks 01154 for (std::list<const dtn::data::Block* >::const_iterator it = blocklist.begin(); it != blocklist.end(); it++, i++) 01155 { 01156 // generate a temporary block file 01157 ibrcommon::TemporaryFile tmpfile(_blockPath, "block"); 01158 01159 ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary); 01160 01161 dtn::data::SeparateSerializer serializer(datei); 01162 serializer << (*(*it)); 01163 datei.close(); 01164 01165 /* Schreibt Blöcke in die DB. 01166 * Die Blocknummern werden nach folgendem Schema in die DB geschrieben: 01167 * Die Blöcke des ersten Fragments fangen bei -x an und gehen bis -1 01168 * Die Blöcke des letzten Fragments beginnen bei 1 und gehen bis x 01169 * Der Payloadblock wird an Stelle 0 eingefügt. 01170 */ 01171 sqlite3_bind_text(_statements[BLOCK_STORE], 1, bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT); 01172 sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType())); 01173 sqlite3_bind_text(_statements[BLOCK_STORE], 3, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT); 01174 01175 blocknumber = blocklist.size() - i; 01176 01177 if (first) 01178 { 01179 blocknumber = (blocklist.size() - i)*(-1); 01180 } 01181 01182 sqlite3_bind_int(_statements[BLOCK_STORE],4,blocknumber); 01183 executeQuery(_statements[BLOCK_STORE]); 01184 } 01185 } 01186 01187 //At first the database is checked for already received bundles and some 01188 //informations are read, which effect the following program flow. 01189 sqlite3_bind_text(_statements[BUNDLE_GET_FRAGMENT],1,sourceEID.c_str(),sourceEID.length(),SQLITE_TRANSIENT); 01190 sqlite3_bind_int64(_statements[BUNDLE_GET_FRAGMENT],2,bundle._timestamp); 01191 sqlite3_bind_int(_statements[BUNDLE_GET_FRAGMENT],3,bundle._sequencenumber); 01192 err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]); 01193 01194 //Es ist noch kein Eintrag vorhanden: generiere Payloadfilename 01195 if(err == SQLITE_DONE) 01196 { 01197 // generate a temporary block file 01198 ibrcommon::TemporaryFile tmpfile(_blockPath, "block"); 01199 payloadfile = tmpfile; 01200 } 01201 //Es ist bereits ein eintrag vorhanden: speicher den payloadfilename 01202 else if (err == SQLITE_ROW) 01203 { 01204 payloadfile = ibrcommon::File( (const char*)sqlite3_column_text(_statements[BUNDLE_GET_FRAGMENT], 14) ); //14 = Payloadfilename 01205 } 01206 01207 while (err == SQLITE_ROW){ 01208 //The following codepice summs the payloadfragmentbytes, to check if all fragments were received 01209 //and the bundle is complete. 01210 fragmentoffset = sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],9); // 9 = fragmentoffset 01211 01212 if(bitcounter >= fragmentoffset){ 01213 bitcounter += fragmentoffset - bitcounter + sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],13); //13 = payloadlength 01214 } 01215 else if(bitcounter >= bundle._fragmentoffset){ 01216 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize; 01217 } 01218 else{ 01219 allFragmentsReceived = false; 01220 } 01221 err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]); 01222 } 01223 01224 if(bitcounter +1 >= bundle._fragmentoffset && allFragmentsReceived) //Wenn das aktuelle fragment das größte ist, ist die Liste durchlaufen und das Frag nicht dazugezählt. Dies wird hier nachgeholt. 01225 { 01226 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize; 01227 } 01228 01229 sqlite3_reset(_statements[BUNDLE_GET_FRAGMENT]); 01230 if(err != SQLITE_DONE){ 01231 stringstream error; 01232 error << "SQLiteBundleStorage: storeFragment: " << err << " errmsg: " << sqlite3_errmsg(_database); 01233 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01234 throw SQLiteQueryException(error.str()); 01235 } 01236 01237 //Save the Payload in a separate file 01238 std::fstream datei(payloadfile.getPath().c_str(), std::ios::in | std::ios::out | std::ios::binary); 01239 datei.seekp(bundle._fragmentoffset); 01240 { 01241 ibrcommon::BLOB::iostream io = payloadBlob.iostream(); 01242 01243 size_t ret = 0; 01244 istream &s = (*io); 01245 s.seekg(0); 01246 01247 while (true) 01248 { 01249 char buf; 01250 s.get(buf); 01251 if(!s.eof()){ 01252 datei.put(buf); 01253 ret++; 01254 } 01255 else 01256 break; 01257 } 01258 datei.close(); 01259 } 01260 01261 //All fragments received 01262 if(bundle._appdatalength == bitcounter) 01263 { 01264 /* 01265 * 1. Payloadblock zusammensetzen 01266 * 2. PayloadBlock in der BlockTable sichern 01267 * 3. Primary BlockInfos in die BundleTable eintragen. 01268 * 4. Einträge der Fragmenttabelle löschen 01269 */ 01270 01271 // 1. Get the payloadblockobject 01272 { 01273 ibrcommon::BLOB::Reference ref(ibrcommon::FileBLOB::create(payloadfile)); 01274 dtn::data::PayloadBlock pb(ref); 01275 01276 //Copy EID list 01277 if(pb.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS)) 01278 { 01279 std::list<dtn::data::EID> eidlist; 01280 std::list<dtn::data::EID>::iterator eidIt; 01281 eidlist = block.getEIDList(); 01282 01283 for(eidIt = eidlist.begin(); eidIt != eidlist.end(); eidIt++) 01284 { 01285 pb.addEID(*eidIt); 01286 } 01287 } 01288 01289 //Copy ProcFlags 01290 pb.set(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT, block.get(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT)); 01291 pb.set(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED)); 01292 pb.set(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED)); 01293 pb.set(dtn::data::PayloadBlock::LAST_BLOCK, block.get(dtn::data::PayloadBlock::LAST_BLOCK)); 01294 pb.set(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED)); 01295 pb.set(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED, block.get(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED)); 01296 pb.set(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS, block.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS)); 01297 01298 //2. Save the PayloadBlock to a file and store the metainformation in the BlockTable 01299 ibrcommon::TemporaryFile tmpfile(_blockPath, "block"); 01300 01301 std::ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary); 01302 dtn::data::SeparateSerializer serializer(datei); 01303 serializer << (pb); 01304 datei.close(); 01305 01306 filename = tmpfile; 01307 } 01308 01309 sqlite3_bind_text(_statements[BLOCK_STORE], 1,bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT); 01310 // sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType())); 01311 sqlite3_bind_text(_statements[BLOCK_STORE], 3, filename.getPath().c_str(), filename.getPath().length(), SQLITE_TRANSIENT); 01312 sqlite3_bind_int(_statements[BLOCK_STORE], 4, 0); 01313 executeQuery(_statements[BLOCK_STORE]); 01314 01315 //3. Delete reassembled Payload data 01316 payloadfile.remove(); 01317 01318 //4. Remove Fragment entries from BundleTable 01319 sqlite3_bind_text(_statements[BUNDLE_DELETE],1,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT); 01320 sqlite3_bind_int64(_statements[BUNDLE_DELETE],2,bundle._sequencenumber); 01321 sqlite3_bind_int64(_statements[BUNDLE_DELETE],3,bundle._timestamp); 01322 executeQuery(_statements[BUNDLE_DELETE]); 01323 01324 //5. Write the Primary Block to the BundleTable 01325 size_t procFlags = bundle._procflags; 01326 procFlags &= ~(dtn::data::PrimaryBlock::FRAGMENT); 01327 01328 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT); 01329 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT); 01330 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT); 01331 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT); 01332 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT); 01333 sqlite3_bind_int(_statements[BUNDLE_STORE],6,procFlags); 01334 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp); 01335 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber); 01336 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime); 01337 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset); 01338 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength); 01339 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL); 01340 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL); 01341 sqlite3_bind_int(_statements[BUNDLE_STORE],14,NULL); 01342 sqlite3_bind_text(_statements[BUNDLE_STORE],15,NULL,0,SQLITE_TRANSIENT); 01343 executeQuery(_statements[BUNDLE_STORE]); 01344 } 01345 01346 //There are some fragments missing 01347 else{ 01348 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT); 01349 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT); 01350 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT); 01351 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT); 01352 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT); 01353 sqlite3_bind_int(_statements[BUNDLE_STORE],6,bundle._procflags); 01354 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp); 01355 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber); 01356 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime); 01357 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset); 01358 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength); 01359 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL); 01360 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL); 01361 sqlite3_bind_int(_statements[BUNDLE_STORE],14,payloadsize); 01362 sqlite3_bind_text(_statements[BUNDLE_STORE],15,payloadfile.getPath().c_str(),payloadfile.getPath().length(),SQLITE_TRANSIENT); 01363 executeQuery(_statements[BUNDLE_STORE]); 01364 } 01365 } 01366 #endif 01367 01368 void SQLiteBundleStorage::raiseEvent(const Event *evt) 01369 { 01370 try { 01371 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt); 01372 01373 if (time.getAction() == dtn::core::TIME_SECOND_TICK) 01374 { 01375 _tasks.push(new TaskExpire(time.getTimestamp())); 01376 } 01377 } catch (const std::bad_cast&) { } 01378 01379 try { 01380 const GlobalEvent &global = dynamic_cast<const GlobalEvent&>(*evt); 01381 01382 if(global.getAction() == GlobalEvent::GLOBAL_IDLE) 01383 { 01384 // switch to idle mode 01385 ibrcommon::MutexLock l(TaskIdle::_mutex); 01386 TaskIdle::_idle = true; 01387 01388 // generate an idle task 01389 _tasks.push(new TaskIdle()); 01390 } 01391 else if(global.getAction() == GlobalEvent::GLOBAL_BUSY) 01392 { 01393 // switch back to non-idle mode 01394 ibrcommon::MutexLock l(TaskIdle::_mutex); 01395 TaskIdle::_idle = false; 01396 } 01397 } catch (const std::bad_cast&) { } 01398 } 01399 01400 void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage) 01401 { 01402 /* 01403 * Only if the actual time is bigger or equal than the time when the next bundle expires, deleteexpired is called. 01404 */ 01405 size_t exp_time = storage.get_expire_time(); 01406 if ((_timestamp < exp_time) || (exp_time == 0)) return; 01407 01408 /* 01409 * Performanceverbesserung: Damit die Abfragen nicht jede Sekunde ausgeführt werden müssen, speichert man den Zeitpunkt an dem 01410 * das nächste Bündel gelöscht werden soll in eine Variable und fürhrt deleteexpired erst dann aus wenn ein Bündel abgelaufen ist. 01411 * Nach dem Löschen wird die DB durchsucht und der nächste Ablaufzeitpunkt wird in die Variable gesetzt. 01412 */ 01413 01414 { 01415 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_FILENAMES], storage._statements[EXPIRE_BUNDLE_FILENAMES]); 01416 01417 // query for blocks of expired bundles 01418 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_FILENAMES], 1, _timestamp); 01419 while (sqlite3_step(storage._statements[EXPIRE_BUNDLE_FILENAMES]) == SQLITE_ROW) 01420 { 01421 ibrcommon::File block((const char*)sqlite3_column_text(storage._statements[EXPIRE_BUNDLE_FILENAMES],0)); 01422 block.remove(); 01423 } 01424 } 01425 01426 { 01427 AutoResetLock l(storage._locks[EXPIRE_BUNDLES], storage._statements[EXPIRE_BUNDLES]); 01428 01429 // query expired bundles 01430 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLES], 1, _timestamp); 01431 while (sqlite3_step(storage._statements[EXPIRE_BUNDLES]) == SQLITE_ROW) 01432 { 01433 dtn::data::BundleID id; 01434 storage.get_bundleid(storage._statements[EXPIRE_BUNDLES], id); 01435 dtn::core::BundleExpiredEvent::raise(id); 01436 } 01437 } 01438 01439 { 01440 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_DELETE], storage._statements[EXPIRE_BUNDLE_DELETE]); 01441 01442 // delete all expired db entries (bundles and blocks) 01443 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_DELETE], 1, _timestamp); 01444 sqlite3_step(storage._statements[EXPIRE_BUNDLE_DELETE]); 01445 } 01446 01447 //update deprecated timer 01448 storage.update_expire_time(); 01449 } 01450 01451 void SQLiteBundleStorage::update_expire_time() 01452 { 01453 ibrcommon::MutexLock l(_locks[EXPIRE_NEXT_TIMESTAMP]); 01454 int err = sqlite3_step(_statements[EXPIRE_NEXT_TIMESTAMP]); 01455 01456 if (err == SQLITE_ROW) 01457 { 01458 ibrcommon::MutexLock l(_next_expiration_lock); 01459 _next_expiration = sqlite3_column_int64(_statements[EXPIRE_NEXT_TIMESTAMP], 0); 01460 } 01461 else 01462 { 01463 ibrcommon::MutexLock l(_next_expiration_lock); 01464 _next_expiration = 0; 01465 } 01466 01467 sqlite3_reset(_statements[EXPIRE_NEXT_TIMESTAMP]); 01468 } 01469 01470 void SQLiteBundleStorage::update_custodian(const dtn::data::BundleID &id, const dtn::data::EID &custodian) 01471 { 01472 // select query with or without fragmentation extension 01473 STORAGE_STMT query = BUNDLE_UPDATE_CUSTODIAN; 01474 if (id.fragment) query = FRAGMENT_UPDATE_CUSTODIAN; 01475 01476 01477 AutoResetLock l(_locks[query], _statements[query]); 01478 01479 sqlite3_bind_text(_statements[query], 1, custodian.getString().c_str(), custodian.getString().length(), SQLITE_TRANSIENT); 01480 set_bundleid(_statements[query], id, 1); 01481 01482 // update the custodian in the database 01483 int err = sqlite3_step(_statements[query]); 01484 01485 if (err != SQLITE_DONE) 01486 { 01487 stringstream error; 01488 error << "SQLiteBundleStorage: update_custodian() failure: " << err << " " << sqlite3_errmsg(_database); 01489 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01490 } 01491 } 01492 01493 int SQLiteBundleStorage::store_blocks(const data::Bundle &bundle) 01494 { 01495 int blocktyp, blocknumber(1), storedBytes(0); 01496 01497 // get all blocks of the bundle 01498 const list<const data::Block*> blocklist = bundle.getBlocks(); 01499 01500 // generate a bundle id 01501 data::BundleID id(bundle); 01502 01503 for(std::list<const data::Block*>::const_iterator it = blocklist.begin() ;it != blocklist.end(); it++) 01504 { 01505 blocktyp = (int)(*it)->getType(); 01506 01507 ibrcommon::TemporaryFile tmpfile(_blockPath, "block"); 01508 std::ofstream filestream(tmpfile.getPath().c_str(), std::ios_base::out | std::ios::binary); 01509 01510 dtn::data::SeparateSerializer serializer(filestream); 01511 serializer << (*(*it)); 01512 storedBytes += serializer.getLength(*(*it)); 01513 01514 filestream.close(); 01515 01516 // protect this query from concurrent access and enable the auto-reset feature 01517 AutoResetLock l(_locks[BLOCK_STORE], _statements[BLOCK_STORE]); 01518 01519 // set bundle key data 01520 set_bundleid(_statements[BLOCK_STORE], id); 01521 01522 // set the column four to null if this is not a fragment 01523 if (!id.fragment) sqlite3_bind_null(_statements[BLOCK_STORE], 4); 01524 01525 // set the block type 01526 sqlite3_bind_int(_statements[BLOCK_STORE], 5, blocktyp); 01527 01528 // the filename of the block data 01529 sqlite3_bind_text(_statements[BLOCK_STORE], 6, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT); 01530 01531 // the ordering number 01532 sqlite3_bind_int(_statements[BLOCK_STORE], 7, blocknumber); 01533 01534 // execute the query and store the block in the database 01535 if (sqlite3_step(_statements[BLOCK_STORE]) != SQLITE_DONE) 01536 { 01537 throw SQLiteQueryException("can not store block of bundle"); 01538 } 01539 01540 //increment blocknumber 01541 blocknumber++; 01542 } 01543 01544 return storedBytes; 01545 } 01546 01547 void SQLiteBundleStorage::get_blocks(dtn::data::Bundle &bundle, const dtn::data::BundleID &id) 01548 { 01549 int err = 0; 01550 string file; 01551 01552 // select the right statement to use 01553 const size_t stmt_key = id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID; 01554 01555 AutoResetLock l(_locks[stmt_key], _statements[stmt_key]); 01556 01557 // set the bundle key values 01558 set_bundleid(_statements[stmt_key], id); 01559 01560 // query the database and step through all blocks 01561 while ((err = sqlite3_step(_statements[stmt_key])) == SQLITE_ROW) 01562 { 01563 const ibrcommon::File f( (const char*) sqlite3_column_text(_statements[stmt_key], 0) ); 01564 01565 // open the file 01566 std::ifstream is(f.getPath().c_str(), std::ios::binary | std::ios::in); 01567 01568 // read the block 01569 dtn::data::Block &block = dtn::data::SeparateDeserializer(is, bundle).readBlock(); 01570 01571 // close the file 01572 is.close(); 01573 01574 // modify the age block if present 01575 try { 01576 dtn::data::AgeBlock &agebl = dynamic_cast<dtn::data::AgeBlock&>(block); 01577 01578 // modify the AgeBlock with the age of the file 01579 time_t age = f.lastaccess() - f.lastmodify(); 01580 01581 agebl.addSeconds(age); 01582 } catch (const std::bad_cast&) { }; 01583 } 01584 01585 if (err == SQLITE_DONE) 01586 { 01587 if (bundle.getBlocks().size() == 0) 01588 { 01589 IBRCOMMON_LOGGER(error) << "get_blocks: no blocks found for: " << id.toString() << IBRCOMMON_LOGGER_ENDL; 01590 throw SQLiteQueryException("no blocks found"); 01591 } 01592 } 01593 else 01594 { 01595 IBRCOMMON_LOGGER(error) << "get_blocks() failure: "<< err << " " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL; 01596 throw SQLiteQueryException("can not query for blocks"); 01597 } 01598 } 01599 01600 void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage) 01601 { 01602 // until IDLE is false 01603 while (true) 01604 { 01605 /* 01606 * When an object (table, index, trigger, or view) is dropped from the database, it leaves behind empty space. 01607 * This empty space will be reused the next time new information is added to the database. But in the meantime, 01608 * the database file might be larger than strictly necessary. Also, frequent inserts, updates, and deletes can 01609 * cause the information in the database to become fragmented - scrattered out all across the database file rather 01610 * than clustered together in one place. 01611 * The VACUUM command cleans the main database. This eliminates free pages, aligns table data to be contiguous, 01612 * and otherwise cleans up the database file structure. 01613 */ 01614 { 01615 AutoResetLock l(storage._locks[VACUUM], storage._statements[VACUUM]); 01616 sqlite3_step(storage._statements[VACUUM]); 01617 } 01618 01619 // here we can do some IDLE stuff... 01620 ::sleep(1); 01621 01622 ibrcommon::MutexLock l(TaskIdle::_mutex); 01623 if (!TaskIdle::_idle) return; 01624 } 01625 } 01626 01627 const std::string SQLiteBundleStorage::getName() const 01628 { 01629 return "SQLiteBundleStorage"; 01630 } 01631 01632 void SQLiteBundleStorage::releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id) 01633 { 01634 // custody is successful transferred to another node. 01635 // it is safe to delete this bundle now. (depending on the routing algorithm.) 01636 // update the custodian of this bundle with the new one 01637 update_custodian(id, custodian); 01638 } 01639 01640 void SQLiteBundleStorage::new_expire_time(size_t ttl) 01641 { 01642 ibrcommon::MutexLock l(_next_expiration_lock); 01643 if (_next_expiration == 0 || ttl < _next_expiration) 01644 { 01645 _next_expiration = ttl; 01646 } 01647 } 01648 01649 void SQLiteBundleStorage::reset_expire_time() 01650 { 01651 ibrcommon::MutexLock l(_next_expiration_lock); 01652 _next_expiration = 0; 01653 } 01654 01655 size_t SQLiteBundleStorage::get_expire_time() 01656 { 01657 ibrcommon::MutexLock l(_next_expiration_lock); 01658 return _next_expiration; 01659 } 01660 01661 void SQLiteBundleStorage::add_deletion(const dtn::data::BundleID &id) 01662 { 01663 ibrcommon::MutexLock l(_deletion_mutex); 01664 _deletion_list.insert(id); 01665 } 01666 01667 void SQLiteBundleStorage::remove_deletion(const dtn::data::BundleID &id) 01668 { 01669 ibrcommon::MutexLock l(_deletion_mutex); 01670 _deletion_list.erase(id); 01671 } 01672 01673 bool SQLiteBundleStorage::contains_deletion(const dtn::data::BundleID &id) 01674 { 01675 ibrcommon::MutexLock l(_deletion_mutex); 01676 return (_deletion_list.find(id) != _deletion_list.end()); 01677 } 01678 01679 void SQLiteBundleStorage::trylock() throw (ibrcommon::MutexException) 01680 { 01681 // not implemented 01682 throw ibrcommon::MutexException(); 01683 } 01684 01685 void SQLiteBundleStorage::enter() throw (ibrcommon::MutexException) 01686 { 01687 // lock all statements 01688 for (int i = 0; i < SQL_QUERIES_END; i++) 01689 { 01690 // lock the statement 01691 _locks[i].enter(); 01692 } 01693 } 01694 01695 void SQLiteBundleStorage::leave() throw (ibrcommon::MutexException) 01696 { 01697 // unlock all statements 01698 for (int i = 0; i < SQL_QUERIES_END; i++) 01699 { 01700 // unlock the statement 01701 _locks[i].leave(); 01702 } 01703 } 01704 01705 void SQLiteBundleStorage::set_bundleid(sqlite3_stmt *st, const dtn::data::BundleID &id, size_t offset) const 01706 { 01707 const std::string source_id = id.source.getString(); 01708 sqlite3_bind_text(st, offset + 1, source_id.c_str(), source_id.length(), SQLITE_TRANSIENT); 01709 sqlite3_bind_int64(st, offset + 2, id.timestamp); 01710 sqlite3_bind_int64(st, offset + 3, id.sequencenumber); 01711 01712 if (id.fragment) 01713 { 01714 sqlite3_bind_int64(st, offset + 4, id.offset); 01715 } 01716 } 01717 01718 void SQLiteBundleStorage::get_bundleid(sqlite3_stmt *st, dtn::data::BundleID &id, size_t offset) const 01719 { 01720 id.source = dtn::data::EID((const char*)sqlite3_column_text(st, offset + 0)); 01721 id.timestamp = sqlite3_column_int64(st, offset + 1); 01722 id.sequencenumber = sqlite3_column_int64(st, offset + 2); 01723 01724 id.fragment = (sqlite3_column_text(st, offset + 3) != NULL); 01725 id.offset = sqlite3_column_int64(st, offset + 3); 01726 } 01727 } 01728 }