|
IBR-DTNSuite 0.6
|
00001 /* 00002 * SQLiteBundleStorage.cpp 00003 * 00004 * Created on: 09.01.2010 00005 * Author: Myrtus 00006 */ 00007 00008 00009 #include "core/SQLiteBundleStorage.h" 00010 #include "core/TimeEvent.h" 00011 #include "core/GlobalEvent.h" 00012 #include "core/BundleCore.h" 00013 #include "core/SQLiteConfigure.h" 00014 #include "core/BundleEvent.h" 00015 #include "core/BundleExpiredEvent.h" 00016 00017 #include <ibrdtn/data/ScopeControlHopLimitBlock.h> 00018 #include <ibrdtn/data/PayloadBlock.h> 00019 #include <ibrdtn/data/AgeBlock.h> 00020 #include <ibrdtn/data/Serializer.h> 00021 #include <ibrdtn/data/Bundle.h> 00022 #include <ibrdtn/data/BundleID.h> 00023 #include <ibrdtn/utils/Clock.h> 00024 00025 #include <ibrcommon/thread/MutexLock.h> 00026 #include <ibrcommon/data/BLOB.h> 00027 #include <ibrcommon/AutoDelete.h> 00028 #include <ibrcommon/Logger.h> 00029 00030 namespace dtn 00031 { 00032 namespace core 00033 { 00034 void sql_tracer(void*, const char * pQuery) 00035 { 00036 IBRCOMMON_LOGGER_DEBUG(50) << "sqlite trace: " << pQuery << IBRCOMMON_LOGGER_ENDL; 00037 } 00038 00039 const std::string SQLiteBundleStorage::_tables[SQL_TABLE_END] = 00040 { "bundles", "blocks", "routing", "routing_bundles", "routing_nodes" }; 00041 00042 const std::string SQLiteBundleStorage::_sql_queries[SQL_QUERIES_END] = 00043 { 00044 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM " + _tables[SQL_TABLE_BUNDLE] + " ORDER BY priority DESC LIMIT ?,?;", 00045 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL LIMIT 1;", 00046 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? LIMIT 1;", 00047 "SELECT * FROM " + _tables[SQL_TABLE_BUNDLE] + " WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset != NULL ORDER BY fragmentoffset ASC;", 00048 00049 "SELECT source, timestamp, sequencenumber, fragmentoffset, procflags FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;", 00050 "SELECT filename FROM "+ _tables[SQL_TABLE_BUNDLE] +" as a, "+ _tables[SQL_TABLE_BLOCK] +" as b WHERE a.source_id = b.source_id AND a.timestamp = b.timestamp AND a.sequencenumber = b.sequencenumber AND ((a.fragmentoffset = b.fragmentoffset) OR ((a.fragmentoffset IS NULL) AND (b.fragmentoffset IS NULL))) AND a.expiretime <= ?;", 00051 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;", 00052 "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY expiretime ASC LIMIT 1;", 00053 00054 "SELECT ROWID FROM "+ _tables[SQL_TABLE_BUNDLE] +" LIMIT 1;", 00055 "SELECT COUNT(ROWID) FROM "+ _tables[SQL_TABLE_BUNDLE] +";", 00056 00057 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;", 00058 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;", 00059 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +";", 00060 "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +" (source_id, timestamp, sequencenumber, fragmentoffset, source, destination, reportto, custodian, procflags, lifetime, appdatalength, expiretime, priority, hopcount) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?);", 00061 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;", 00062 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;", 00063 00064 "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET procflags = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;", 00065 00066 "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL ORDER BY ordernumber ASC;", 00067 "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? ORDER BY ordernumber ASC;", 00068 "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL AND ordernumber = ?;", 00069 "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? AND ordernumber = ?;", 00070 "DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +";", 00071 "INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +" (source_id, timestamp, sequencenumber, fragmentoffset, blocktype, filename, ordernumber) VALUES (?,?,?,?,?,?,?);", 00072 00073 #ifdef SQLITE_STORAGE_EXTENDED 00074 "SELECT Routing FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;", 00075 "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;", 00076 "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +";", 00077 "INSERT INTO "+ _tables[SQL_TABLE_ROUTING] +" (Key, Routing) VALUES (?,?);", 00078 00079 "SELECT Routing FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;", 00080 "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;", 00081 "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +";", 00082 "INSERT INTO "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (EID, Key, Routing) VALUES (?,?,?);", 00083 00084 "SELECT Routing FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;", 00085 "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;", 00086 "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID, Key, Routing) VALUES (?,?,?);", 00087 #endif 00088 00089 "VACUUM;" 00090 }; 00091 00092 const std::string SQLiteBundleStorage::_db_structure[10] = 00093 { 00094 "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BLOCK] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `blocktype` INTEGER NOT NULL, `filename` TEXT NOT NULL, `ordernumber` INTEGER NOT NULL);", 00095 "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BUNDLE] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `source` TEXT NOT NULL, `destination` TEXT NOT NULL, `reportto` TEXT NOT NULL, `custodian` TEXT NOT NULL, `procflags` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `lifetime` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `appdatalength` INTEGER DEFAULT NULL, `expiretime` INTEGER NOT NULL, `priority` INTEGER NOT NULL, `hopcount` INTEGER DEFAULT NULL);", 00096 "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);", 00097 "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);", 00098 "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);", 00099 "CREATE TRIGGER IF NOT EXISTS blocks_autodelete AFTER DELETE ON " + _tables[SQL_TABLE_BUNDLE] + " FOR EACH ROW BEGIN DELETE FROM " + _tables[SQL_TABLE_BLOCK] + " WHERE " + _tables[SQL_TABLE_BLOCK] + ".source_id = OLD.source_id AND " + _tables[SQL_TABLE_BLOCK] + ".timestamp = OLD.timestamp AND " + _tables[SQL_TABLE_BLOCK] + ".sequencenumber = OLD.sequencenumber AND ((" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset IS NULL AND old.fragmentoffset IS NULL) OR (" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset = old.fragmentoffset)); END;", 00100 "CREATE UNIQUE INDEX IF NOT EXISTS blocks_bid ON " + _tables[SQL_TABLE_BLOCK] + " (source_id, timestamp, sequencenumber, fragmentoffset);", 00101 "CREATE INDEX IF NOT EXISTS bundles_destination ON " + _tables[SQL_TABLE_BUNDLE] + " (destination);", 00102 "CREATE INDEX IF NOT EXISTS bundles_destination_priority ON " + _tables[SQL_TABLE_BUNDLE] + " (destination, priority);", 00103 "CREATE UNIQUE INDEX IF NOT EXISTS bundles_id ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset);" 00104 "CREATE INDEX IF NOT EXISTS bundles_expire ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset, expiretime);" 00105 }; 00106 00107 ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex; 00108 bool SQLiteBundleStorage::TaskIdle::_idle = false; 00109 00110 SQLiteBundleStorage::SQLBundleQuery::SQLBundleQuery() 00111 : _statement(NULL) 00112 { } 00113 00114 SQLiteBundleStorage::SQLBundleQuery::~SQLBundleQuery() 00115 { 00116 // free the statement if used before 00117 if (_statement != NULL) 00118 { 00119 sqlite3_finalize(_statement); 00120 } 00121 } 00122 00123 SQLiteBundleStorage::AutoResetLock::AutoResetLock(ibrcommon::Mutex &mutex, sqlite3_stmt *st) 00124 : _lock(mutex), _st(st) 00125 { 00126 //sqlite3_clear_bindings(st); 00127 } 00128 00129 SQLiteBundleStorage::AutoResetLock::~AutoResetLock() 00130 { 00131 sqlite3_reset(_st); 00132 } 00133 00134 SQLiteBundleStorage::SQLiteBLOB::SQLiteBLOB(const ibrcommon::File &path) 00135 : _blobPath(path) 00136 { 00137 // generate a new temporary file 00138 _file = ibrcommon::TemporaryFile(_blobPath, "blob"); 00139 } 00140 00141 SQLiteBundleStorage::SQLiteBLOB::~SQLiteBLOB() 00142 { 00143 // delete the file if the last reference is destroyed 00144 _file.remove(); 00145 } 00146 00147 void SQLiteBundleStorage::SQLiteBLOB::clear() 00148 { 00149 // close the file 00150 _filestream.close(); 00151 00152 // remove the old file 00153 _file.remove(); 00154 00155 // generate a new temporary file 00156 _file = ibrcommon::TemporaryFile(_blobPath, "blob"); 00157 00158 // open temporary file 00159 _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::trunc | ios::binary ); 00160 00161 if (!_filestream.is_open()) 00162 { 00163 IBRCOMMON_LOGGER(error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL; 00164 throw ibrcommon::CanNotOpenFileException(_file); 00165 } 00166 } 00167 00168 void SQLiteBundleStorage::SQLiteBLOB::open() 00169 { 00170 ibrcommon::BLOB::_filelimit.wait(); 00171 00172 // open temporary file 00173 _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::binary ); 00174 00175 if (!_filestream.is_open()) 00176 { 00177 IBRCOMMON_LOGGER(error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL; 00178 throw ibrcommon::CanNotOpenFileException(_file); 00179 } 00180 } 00181 00182 void SQLiteBundleStorage::SQLiteBLOB::close() 00183 { 00184 // flush the filestream 00185 _filestream.flush(); 00186 00187 // close the file 00188 _filestream.close(); 00189 00190 ibrcommon::BLOB::_filelimit.post(); 00191 } 00192 00193 size_t SQLiteBundleStorage::SQLiteBLOB::__get_size() 00194 { 00195 return _file.size(); 00196 } 00197 00198 ibrcommon::BLOB::Reference SQLiteBundleStorage::create() 00199 { 00200 return ibrcommon::BLOB::Reference(new SQLiteBLOB(_blobPath)); 00201 } 00202 00203 SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const size_t &size) 00204 : dbPath(path), dbFile(path.get("sqlite.db")), dbSize(size), _next_expiration(0) 00205 { 00206 //Configure SQLite Library 00207 SQLiteConfigure::configure(); 00208 00209 // check if SQLite is thread-safe 00210 if (sqlite3_threadsafe() == 0) 00211 { 00212 IBRCOMMON_LOGGER(critical) << "sqlite library has not compiled with threading support." << IBRCOMMON_LOGGER_ENDL; 00213 throw ibrcommon::Exception("need threading support in sqlite!"); 00214 } 00215 } 00216 00217 SQLiteBundleStorage::~SQLiteBundleStorage() 00218 { 00219 stop(); 00220 join(); 00221 00222 ibrcommon::MutexLock l(*this); 00223 00224 // free all statements 00225 for (int i = 0; i < SQL_QUERIES_END; i++) 00226 { 00227 // prepare the statement 00228 sqlite3_finalize(_statements[i]); 00229 } 00230 00231 //close Databaseconnection 00232 if (sqlite3_close(_database) != SQLITE_OK) 00233 { 00234 IBRCOMMON_LOGGER(error) << "unable to close database" << IBRCOMMON_LOGGER_ENDL; 00235 } 00236 00237 // shutdown sqlite library 00238 SQLiteConfigure::shutdown(); 00239 } 00240 00241 void SQLiteBundleStorage::openDatabase(const ibrcommon::File &path) 00242 { 00243 ibrcommon::MutexLock l(*this); 00244 00245 // set the block path 00246 _blockPath = dbPath.get(_tables[SQL_TABLE_BLOCK]); 00247 _blobPath = dbPath.get("blob"); 00248 00249 //open the database 00250 if (sqlite3_open_v2(path.getPath().c_str(), &_database, SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) 00251 { 00252 IBRCOMMON_LOGGER(error) << "Can't open database: " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL; 00253 sqlite3_close(_database); 00254 throw ibrcommon::Exception("Unable to open sqlite database"); 00255 } 00256 00257 int err = 0; 00258 00259 // create all tables 00260 for (size_t i = 0; i < 10; i++) 00261 { 00262 sqlite3_stmt *st = prepare(_db_structure[i]); 00263 err = sqlite3_step(st); 00264 if(err != SQLITE_DONE) 00265 { 00266 IBRCOMMON_LOGGER(error) << "SQLiteBundleStorage: failed to create database" << err << IBRCOMMON_LOGGER_ENDL; 00267 } 00268 sqlite3_reset(st); 00269 sqlite3_finalize(st); 00270 } 00271 00272 // delete all old BLOB container 00273 _blobPath.remove(true); 00274 00275 // create BLOB folder 00276 ibrcommon::File::createDirectory( _blobPath ); 00277 00278 // create the bundle folder 00279 ibrcommon::File::createDirectory( _blockPath ); 00280 00281 // prepare all statements 00282 for (int i = 0; i < SQL_QUERIES_END; i++) 00283 { 00284 // prepare the statement 00285 int err = sqlite3_prepare_v2(_database, _sql_queries[i].c_str(), _sql_queries[i].length(), &_statements[i], 0); 00286 if ( err != SQLITE_OK ) 00287 { 00288 IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failed to prepare statement: " << err << " with query: " << _sql_queries[i] << IBRCOMMON_LOGGER_ENDL; 00289 } 00290 } 00291 00292 // disable synchronous mode 00293 sqlite3_exec(_database, "PRAGMA synchronous = OFF;", NULL, NULL, NULL); 00294 00295 // enable sqlite tracing if debug level is higher than 50 00296 if (IBRCOMMON_LOGGER_LEVEL >= 50) 00297 { 00298 sqlite3_trace(_database, &sql_tracer, NULL); 00299 } 00300 } 00301 00302 void SQLiteBundleStorage::componentRun() 00303 { 00304 // loop until aborted 00305 try { 00306 while (true) 00307 { 00308 Task *t = _tasks.getnpop(true); 00309 00310 try { 00311 BlockingTask &btask = dynamic_cast<BlockingTask&>(*t); 00312 try { 00313 btask.run(*this); 00314 } catch (const std::exception&) { 00315 btask.abort(); 00316 continue; 00317 }; 00318 btask.done(); 00319 continue; 00320 } catch (const std::bad_cast&) { }; 00321 00322 try { 00323 ibrcommon::AutoDelete<Task> killer(t); 00324 t->run(*this); 00325 } catch (const std::exception&) { }; 00326 } 00327 } catch (const ibrcommon::QueueUnblockedException &ex) { 00328 // we are aborted, abort all blocking tasks 00329 } 00330 } 00331 00332 void SQLiteBundleStorage::componentUp() 00333 { 00334 //register Events 00335 bindEvent(TimeEvent::className); 00336 bindEvent(GlobalEvent::className); 00337 00338 // open the database and create all folders and files if needed 00339 openDatabase(dbFile); 00340 00341 //Check if the database is consistent with the filesystem 00342 check_consistency(); 00343 00344 // calculate next Bundleexpiredtime 00345 update_expire_time(); 00346 }; 00347 00348 void SQLiteBundleStorage::componentDown() 00349 { 00350 //unregister Events 00351 unbindEvent(TimeEvent::className); 00352 unbindEvent(GlobalEvent::className); 00353 }; 00354 00355 bool SQLiteBundleStorage::__cancellation() 00356 { 00357 _tasks.abort(); 00358 return true; 00359 } 00360 00361 void SQLiteBundleStorage::check_consistency() 00362 { 00363 /* 00364 * check if the database is consistent with the files on the HDD. Therefore every file of the database is put in a list. 00365 * When the files of the database are scanned it is checked if the actual file is in the list of files of the filesystem. if it is so the entry 00366 * of the list of files of the filesystem is deleted. if not the file is put in a seperate list. After this procedure there are 2 lists containing file 00367 * which are inconsistent and can be deleted. 00368 */ 00369 set<string> blockFiles,payloadfiles; 00370 string datei; 00371 00372 list<ibrcommon::File> blist; 00373 list<ibrcommon::File>::iterator file_it; 00374 00375 //1. Durchsuche die Dateien 00376 _blockPath.getFiles(blist); 00377 for(file_it = blist.begin(); file_it != blist.end(); file_it++) 00378 { 00379 datei = (*file_it).getPath(); 00380 if(datei[datei.size()-1] != '.') 00381 { 00382 if(((*file_it).getBasename())[0] == 'b') 00383 { 00384 blockFiles.insert(datei); 00385 } 00386 else 00387 payloadfiles.insert(datei); 00388 } 00389 } 00390 00391 //3. Check consistency of the bundles 00392 check_bundles(blockFiles); 00393 00394 //4. Check consistency of the fragments 00395 check_fragments(payloadfiles); 00396 } 00397 00398 void SQLiteBundleStorage::check_fragments(std::set<std::string> &payloadFiles) 00399 { 00400 int err; 00401 size_t timestamp, sequencenumber, fragmentoffset; 00402 string filename, source, dest, custody, repto; 00403 set<string> consistenFiles, inconsistenSources; 00404 set<size_t> inconsistentTimestamp, inconsistentSeq_number; 00405 set<string>::iterator file_it, consisten_it; 00406 00407 sqlite3_stmt *getPayloadfiles = prepare("SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength, hopcount FROM "+_tables[SQL_TABLE_BUNDLE]+" WHERE fragmentoffset != NULL;"); 00408 00409 for(err = sqlite3_step(getPayloadfiles); err == SQLITE_ROW; err = sqlite3_step(getPayloadfiles)) 00410 { 00411 filename = (const char*)sqlite3_column_text(getPayloadfiles,10); 00412 file_it = payloadFiles.find(filename); 00413 00414 00415 if (file_it == payloadFiles.end()) 00416 { 00417 consisten_it = consistenFiles.find(*file_it); 00418 00419 //inconsistent DB entry 00420 if(consisten_it == consistenFiles.end()) 00421 { 00422 //Generate Report 00423 source = (const char*) sqlite3_column_text(getPayloadfiles, 0); 00424 dest = (const char*) sqlite3_column_text(getPayloadfiles, 1); 00425 repto = (const char*) sqlite3_column_text(getPayloadfiles, 2); 00426 custody = (const char*) sqlite3_column_text(getPayloadfiles, 3); 00427 00428 timestamp = sqlite3_column_int64(getPayloadfiles, 5); 00429 sequencenumber = sqlite3_column_int64(getPayloadfiles, 6); 00430 fragmentoffset = sqlite3_column_int64(getPayloadfiles, 8); 00431 00432 dtn::data::BundleID id( dtn::data::EID(source), timestamp, sequencenumber, true, fragmentoffset ); 00433 dtn::data::MetaBundle mb(id); 00434 00435 mb.procflags = sqlite3_column_int64(getPayloadfiles, 4); 00436 mb.lifetime = sqlite3_column_int64(getPayloadfiles, 7); 00437 mb.destination = dest; 00438 mb.reportto = repto; 00439 mb.custodian = custody; 00440 mb.appdatalength = sqlite3_column_int64(getPayloadfiles, 9); 00441 mb.expiretime = dtn::utils::Clock::getExpireTime(timestamp, mb.lifetime); 00442 00443 if (sqlite3_column_type(getPayloadfiles, 10) != SQLITE_NULL) 00444 { 00445 mb.hopcount = sqlite3_column_int64(getPayloadfiles, 10); 00446 } 00447 00448 dtn::core::BundleEvent::raise(mb, BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE); 00449 } 00450 } 00451 //consistent DB entry 00452 else 00453 consistenFiles.insert(*file_it); 00454 payloadFiles.erase(file_it); 00455 } 00456 00457 sqlite3_reset(getPayloadfiles); 00458 sqlite3_finalize(getPayloadfiles); 00459 } 00460 00461 void SQLiteBundleStorage::check_bundles(std::set<std::string> &blockFiles) 00462 { 00463 std::set<dtn::data::BundleID> corrupt_bundle_ids; 00464 00465 sqlite3_stmt *blockConistencyCheck = prepare("SELECT source_id, timestamp, sequencenumber, fragmentoffset, filename, ordernumber FROM "+ _tables[SQL_TABLE_BLOCK] +";"); 00466 00467 while (sqlite3_step(blockConistencyCheck) == SQLITE_ROW) 00468 { 00469 dtn::data::BundleID id; 00470 00471 // get the bundleid 00472 get_bundleid(blockConistencyCheck, id); 00473 00474 // get the filename 00475 std::string filename = (const char*)sqlite3_column_text(blockConistencyCheck, 4); 00476 00477 // if the filename is not in the list of block files 00478 if (blockFiles.find(filename) == blockFiles.end()) 00479 { 00480 // add the bundle to the deletion list 00481 corrupt_bundle_ids.insert(id); 00482 } 00483 else 00484 { 00485 // file is available, everything is fine 00486 blockFiles.erase(filename); 00487 } 00488 } 00489 sqlite3_reset(blockConistencyCheck); 00490 sqlite3_finalize(blockConistencyCheck); 00491 00492 for(std::set<dtn::data::BundleID>::const_iterator iter = corrupt_bundle_ids.begin(); iter != corrupt_bundle_ids.end(); iter++) 00493 { 00494 try { 00495 const dtn::data::BundleID &id = (*iter); 00496 00497 // get meta data of this bundle 00498 const dtn::data::MetaBundle m = get_meta(id); 00499 00500 // remove the hole bundle 00501 remove(id); 00502 } catch (const dtn::core::SQLiteBundleStorage::SQLiteQueryException&) { }; 00503 } 00504 00505 // delete block files still in the blockfile list 00506 for (std::set<std::string>::const_iterator iter = blockFiles.begin(); iter != blockFiles.end(); iter++) 00507 { 00508 ibrcommon::File blockfile(*iter); 00509 blockfile.remove(); 00510 } 00511 } 00512 00513 00514 sqlite3_stmt* SQLiteBundleStorage::prepare(const std::string &sqlquery) 00515 { 00516 // prepare the statement 00517 sqlite3_stmt *statement; 00518 int err = sqlite3_prepare_v2(_database, sqlquery.c_str(), sqlquery.length(), &statement, 0); 00519 if ( err != SQLITE_OK ) 00520 { 00521 IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failure in prepareStatement: " << err << " with Query: " << sqlquery << IBRCOMMON_LOGGER_ENDL; 00522 } 00523 return statement; 00524 } 00525 00526 dtn::data::MetaBundle SQLiteBundleStorage::get_meta(const dtn::data::BundleID &id) 00527 { 00528 dtn::data::MetaBundle bundle; 00529 00530 // check if the bundle is already on the deletion list 00531 if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException(); 00532 00533 size_t stmt_key = BUNDLE_GET_ID; 00534 if (id.fragment) stmt_key = FRAGMENT_GET_ID; 00535 00536 // lock the prepared statement 00537 AutoResetLock l(_locks[stmt_key], _statements[stmt_key]); 00538 00539 // bind bundle id to the statement 00540 set_bundleid(_statements[stmt_key], id); 00541 00542 // execute the query and check for error 00543 if (sqlite3_step(_statements[stmt_key]) != SQLITE_ROW) 00544 { 00545 stringstream error; 00546 error << "SQLiteBundleStorage: No Bundle found with BundleID: " << id.toString(); 00547 IBRCOMMON_LOGGER_DEBUG(15) << error.str() << IBRCOMMON_LOGGER_ENDL; 00548 throw SQLiteQueryException(error.str()); 00549 } 00550 00551 // query bundle data 00552 get(_statements[stmt_key], bundle); 00553 00554 return bundle; 00555 } 00556 00557 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::MetaBundle &bundle, size_t offset) 00558 { 00559 bundle.source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) ); 00560 bundle.destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) ); 00561 bundle.reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) ); 00562 bundle.custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) ); 00563 bundle.procflags = sqlite3_column_int(st, offset + 4); 00564 bundle.timestamp = sqlite3_column_int64(st, offset + 5); 00565 bundle.sequencenumber = sqlite3_column_int64(st, offset + 6); 00566 bundle.lifetime = sqlite3_column_int64(st, offset + 7); 00567 bundle.expiretime = dtn::utils::Clock::getExpireTime(bundle.timestamp, bundle.lifetime); 00568 00569 if (bundle.procflags & data::Bundle::FRAGMENT) 00570 { 00571 bundle.offset = sqlite3_column_int64(st, offset + 8); 00572 bundle.appdatalength = sqlite3_column_int64(st, offset + 9); 00573 } 00574 00575 if (sqlite3_column_type(st, offset + 10) != SQLITE_NULL) 00576 { 00577 bundle.hopcount = sqlite3_column_int64(st, 10); 00578 } 00579 } 00580 00581 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::Bundle &bundle, size_t offset) 00582 { 00583 bundle._source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) ); 00584 bundle._destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) ); 00585 bundle._reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) ); 00586 bundle._custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) ); 00587 bundle._procflags = sqlite3_column_int(st, offset + 4); 00588 bundle._timestamp = sqlite3_column_int64(st, offset + 5); 00589 bundle._sequencenumber = sqlite3_column_int64(st, offset + 6); 00590 bundle._lifetime = sqlite3_column_int64(st, offset + 7); 00591 00592 if (bundle._procflags & data::Bundle::FRAGMENT) 00593 { 00594 bundle._fragmentoffset = sqlite3_column_int64(st, offset + 8); 00595 bundle._appdatalength = sqlite3_column_int64(st, offset + 9); 00596 } 00597 } 00598 00599 const std::list<dtn::data::MetaBundle> SQLiteBundleStorage::get(BundleFilterCallback &cb) 00600 { 00601 std::list<dtn::data::MetaBundle> ret; 00602 00603 const std::string base_query = 00604 "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength, hopcount FROM " + _tables[SQL_TABLE_BUNDLE]; 00605 00606 sqlite3_stmt *st = NULL; 00607 size_t bind_offset = 1; 00608 00609 try { 00610 SQLBundleQuery &query = dynamic_cast<SQLBundleQuery&>(cb); 00611 00612 if (query._statement == NULL) 00613 { 00614 std::string sql = base_query + " WHERE " + query.getWhere() + " ORDER BY priority DESC"; 00615 00616 if (cb.limit() > 0) 00617 { 00618 sql += " LIMIT ?,?"; 00619 } 00620 00621 // add closing delimiter 00622 sql += ";"; 00623 00624 // prepare the hole statement 00625 query._statement = prepare(sql); 00626 } 00627 00628 st = query._statement; 00629 bind_offset = query.bind(st, 1); 00630 } catch (const std::bad_cast&) { 00631 // this query is not optimized for sql, use the generic way (slow) 00632 st = _statements[BUNDLE_GET_FILTER]; 00633 }; 00634 00635 size_t offset = 0; 00636 while (true) 00637 { 00638 // lock the database 00639 AutoResetLock l(_locks[BUNDLE_GET_FILTER], st); 00640 00641 if (cb.limit() > 0) 00642 { 00643 sqlite3_bind_int64(st, bind_offset, offset); 00644 sqlite3_bind_int64(st, bind_offset + 1, cb.limit()); 00645 } 00646 00647 if (sqlite3_step(st) == SQLITE_DONE) 00648 { 00649 // returned no result 00650 break; 00651 } 00652 00653 while (true) 00654 { 00655 dtn::data::MetaBundle m; 00656 00657 // extract the primary values and set them in the bundle object 00658 get(st, m, 0); 00659 00660 // check if the bundle is already on the deletion list 00661 if (!contains_deletion(m)) 00662 { 00663 // ask the filter if this bundle should be added to the return list 00664 if (cb.shouldAdd(m)) 00665 { 00666 IBRCOMMON_LOGGER_DEBUG(40) << "add bundle to query selection list: " << m.toString() << IBRCOMMON_LOGGER_ENDL; 00667 00668 // add the bundle to the list 00669 ret.push_back(m); 00670 } 00671 } 00672 00673 if (sqlite3_step(st) != SQLITE_ROW) 00674 { 00675 break; 00676 } 00677 00678 // abort if enough bundles are found 00679 if ((cb.limit() > 0) && (ret.size() >= cb.limit())) break; 00680 } 00681 00682 // if a limit is set 00683 if (cb.limit() > 0) 00684 { 00685 // increment the offset, because we might not have enough 00686 offset += cb.limit(); 00687 00688 // abort if enough bundles are found 00689 if (ret.size() >= cb.limit()) break; 00690 } 00691 else 00692 { 00693 break; 00694 } 00695 } 00696 00697 return ret; 00698 } 00699 00700 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id) 00701 { 00702 dtn::data::Bundle bundle; 00703 int err = 0; 00704 00705 IBRCOMMON_LOGGER_DEBUG(25) << "get bundle from sqlite storage " << id.toString() << IBRCOMMON_LOGGER_ENDL; 00706 00707 // if bundle is deleted? 00708 if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException(); 00709 00710 size_t stmt_key = BUNDLE_GET_ID; 00711 if (id.fragment) stmt_key = FRAGMENT_GET_ID; 00712 00713 // do this while db is locked 00714 AutoResetLock l(_locks[stmt_key], _statements[stmt_key]); 00715 00716 // set the bundle key values 00717 set_bundleid(_statements[stmt_key], id); 00718 00719 // execute the query and check for error 00720 if ((err = sqlite3_step(_statements[stmt_key])) != SQLITE_ROW) 00721 { 00722 IBRCOMMON_LOGGER_DEBUG(15) << "sql error: " << err << "; No bundle found with id: " << id.toString() << IBRCOMMON_LOGGER_ENDL; 00723 throw dtn::core::BundleStorage::NoBundleFoundException(); 00724 } 00725 00726 // read bundle data 00727 get(_statements[stmt_key], bundle); 00728 00729 try { 00730 // read all blocks 00731 get_blocks(bundle, id); 00732 } catch (const ibrcommon::Exception &ex) { 00733 IBRCOMMON_LOGGER(error) << "could not get bundle blocks: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00734 throw dtn::core::BundleStorage::NoBundleFoundException(); 00735 } 00736 00737 return bundle; 00738 } 00739 00740 #ifdef SQLITE_STORAGE_EXTENDED 00741 void SQLiteBundleStorage::setPriority(const int priority, const dtn::data::BundleID &id) 00742 { 00743 int err; 00744 size_t procflags; 00745 { 00746 sqlite3_bind_text(_statements[PROCFLAGS_GET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT); 00747 err = sqlite3_step(_statements[PROCFLAGS_GET]); 00748 if(err == SQLITE_ROW) 00749 procflags = sqlite3_column_int(_statements[PROCFLAGS_GET],0); 00750 sqlite3_reset(_statements[PROCFLAGS_GET]); 00751 if(err != SQLITE_DONE){ 00752 stringstream error; 00753 error << "SQLiteBundleStorage: error while Select Querry: " << err; 00754 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00755 throw SQLiteQueryException(error.str()); 00756 } 00757 00758 //Priority auf 0 Setzen 00759 procflags -= ( 128*(procflags & dtn::data::Bundle::PRIORITY_BIT1) + 256*(procflags & dtn::data::Bundle::PRIORITY_BIT2)); 00760 00761 //Set Prioritybits 00762 switch(priority){ 00763 case 1: procflags += data::Bundle::PRIORITY_BIT1; break; //Set PRIORITY_BIT1 00764 case 2: procflags += data::Bundle::PRIORITY_BIT2; break; 00765 case 3: procflags += (data::Bundle::PRIORITY_BIT1 + data::Bundle::PRIORITY_BIT2); break; 00766 } 00767 00768 sqlite3_bind_text(_statements[PROCFLAGS_SET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT); 00769 sqlite3_bind_int64(_statements[PROCFLAGS_SET],2,priority); 00770 sqlite3_step(_statements[PROCFLAGS_SET]); 00771 sqlite3_reset(_statements[PROCFLAGS_SET]); 00772 00773 if(err != SQLITE_DONE) 00774 { 00775 stringstream error; 00776 error << "SQLiteBundleStorage: setPriority error while Select Querry: " << err; 00777 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00778 throw SQLiteQueryException(error.str()); 00779 } 00780 } 00781 } 00782 #endif 00783 00784 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle) 00785 { 00786 IBRCOMMON_LOGGER_DEBUG(25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL; 00787 00788 // // if the bundle is a fragment store it as one 00789 // if (bundle.get(dtn::data::Bundle::FRAGMENT) && local) 00790 // { 00791 // storeFragment(bundle); 00792 // return; 00793 // } 00794 00795 int err; 00796 00797 const dtn::data::EID _sourceid = bundle._source; 00798 size_t TTL = bundle._timestamp + bundle._lifetime; 00799 00800 AutoResetLock l(_locks[BUNDLE_STORE], _statements[BUNDLE_STORE]); 00801 00802 set_bundleid(_statements[BUNDLE_STORE], bundle); 00803 00804 sqlite3_bind_text(_statements[BUNDLE_STORE], 5, bundle._source.getString().c_str(), bundle._source.getString().length(), SQLITE_TRANSIENT); 00805 sqlite3_bind_text(_statements[BUNDLE_STORE], 6, bundle._destination.getString().c_str(), bundle._destination.getString().length(), SQLITE_TRANSIENT); 00806 sqlite3_bind_text(_statements[BUNDLE_STORE], 7, bundle._reportto.getString().c_str(), bundle._reportto.getString().length(), SQLITE_TRANSIENT); 00807 sqlite3_bind_text(_statements[BUNDLE_STORE], 8, bundle._custodian.getString().c_str(), bundle._custodian.getString().length(), SQLITE_TRANSIENT); 00808 sqlite3_bind_int(_statements[BUNDLE_STORE], 9, bundle._procflags); 00809 sqlite3_bind_int64(_statements[BUNDLE_STORE], 10, bundle._lifetime); 00810 00811 if (bundle.get(dtn::data::Bundle::FRAGMENT)) 00812 { 00813 sqlite3_bind_int64(_statements[BUNDLE_STORE], 11, bundle._appdatalength); 00814 } 00815 else 00816 { 00817 sqlite3_bind_null(_statements[BUNDLE_STORE], 4); 00818 sqlite3_bind_null(_statements[BUNDLE_STORE], 11); 00819 } 00820 00821 sqlite3_bind_int64(_statements[BUNDLE_STORE], 12, TTL); 00822 sqlite3_bind_int64(_statements[BUNDLE_STORE], 13, dtn::data::MetaBundle(bundle).getPriority()); 00823 00824 try { 00825 const dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<const dtn::data::ScopeControlHopLimitBlock>(); 00826 sqlite3_bind_int64(_statements[BUNDLE_STORE], 14, schl.getHopsToLive() ); 00827 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { 00828 sqlite3_bind_null(_statements[BUNDLE_STORE], 14 ); 00829 }; 00830 00831 // start a transaction 00832 sqlite3_exec(_database, "BEGIN TRANSACTION;", NULL, NULL, NULL); 00833 00834 // store the bundle data in the database 00835 err = sqlite3_step(_statements[BUNDLE_STORE]); 00836 00837 if (err == SQLITE_CONSTRAINT) 00838 { 00839 IBRCOMMON_LOGGER(warning) << "Bundle is already in the storage" << IBRCOMMON_LOGGER_ENDL; 00840 } 00841 else if (err != SQLITE_DONE) 00842 { 00843 stringstream error; 00844 error << "SQLiteBundleStorage: store() failure: " << err << " " << sqlite3_errmsg(_database); 00845 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00846 00847 // rollback the whole transaction 00848 sqlite3_exec(_database, "ROLLBACK TRANSACTION;", NULL, NULL, NULL); 00849 00850 throw SQLiteQueryException(error.str()); 00851 } 00852 else 00853 { 00854 // stores the blocks to a file 00855 store_blocks(bundle); 00856 00857 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL; 00858 } 00859 00860 // commit the transaction 00861 sqlite3_exec(_database, "END TRANSACTION;", NULL, NULL, NULL); 00862 00863 // set new expire time 00864 new_expire_time(TTL); 00865 00866 try { 00867 // the bundle is stored sucessfully, we could accept custody if it is requested 00868 const dtn::data::EID custodian = acceptCustody(bundle); 00869 00870 // update the custody address of this bundle 00871 update_custodian(bundle, custodian); 00872 } catch (const ibrcommon::Exception&) { 00873 // this bundle has no request for custody transfers 00874 } 00875 } 00876 00877 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id) 00878 { 00879 add_deletion(id); 00880 _tasks.push(new TaskRemove(id)); 00881 } 00882 00883 void SQLiteBundleStorage::TaskRemove::run(SQLiteBundleStorage &storage) 00884 { 00885 { 00886 // select the right statement to use 00887 const size_t stmt_key = _id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID; 00888 00889 // lock the database 00890 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]); 00891 00892 // set the bundle key values 00893 storage.set_bundleid(storage._statements[stmt_key], _id); 00894 00895 // step through all blocks 00896 while (sqlite3_step(storage._statements[stmt_key]) == SQLITE_ROW) 00897 { 00898 // delete each referenced block file 00899 ibrcommon::File blockfile( (const char*)sqlite3_column_text(storage._statements[stmt_key], 0) ); 00900 blockfile.remove(); 00901 } 00902 } 00903 00904 { 00905 const size_t stmt_key = _id.fragment ? FRAGMENT_DELETE : BUNDLE_DELETE; 00906 00907 // lock the database 00908 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]); 00909 00910 // then remove the bundle data 00911 storage.set_bundleid(storage._statements[stmt_key], _id); 00912 sqlite3_step(storage._statements[stmt_key]); 00913 00914 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << _id.toString() << " deleted" << IBRCOMMON_LOGGER_ENDL; 00915 } 00916 00917 //update deprecated timer 00918 storage.update_expire_time(); 00919 00920 // remove it from the deletion list 00921 storage.remove_deletion(_id); 00922 } 00923 00924 #ifdef SQLITE_STORAGE_EXTENDED 00925 std::string SQLiteBundleStorage::getBundleRoutingInfo(const data::BundleID &bundleID, const int &key) 00926 { 00927 int err; 00928 string result; 00929 00930 sqlite3_bind_text(_statements[ROUTING_GET],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT); 00931 sqlite3_bind_int(_statements[ROUTING_GET],2,key); 00932 err = sqlite3_step(_statements[ROUTING_GET]); 00933 if(err == SQLITE_ROW){ 00934 result = (const char*) sqlite3_column_text(_statements[ROUTING_GET],0); 00935 } 00936 sqlite3_reset(_statements[ROUTING_GET]); 00937 if(err != SQLITE_DONE && err != SQLITE_ROW){ 00938 stringstream error; 00939 error << "SQLiteBundleStorage: getBundleRoutingInfo: " << err << " errmsg: " << err; 00940 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00941 throw SQLiteQueryException(error.str()); 00942 } 00943 00944 return result; 00945 } 00946 00947 std::string SQLiteBundleStorage::getNodeRoutingInfo(const data::EID &eid, const int &key) 00948 { 00949 int err; 00950 string result; 00951 00952 err = sqlite3_bind_text(_statements[NODE_GET],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT); 00953 err = sqlite3_bind_int(_statements[NODE_GET],2,key); 00954 err = sqlite3_step(_statements[NODE_GET]); 00955 if(err == SQLITE_ROW){ 00956 result = (const char*) sqlite3_column_text(_statements[NODE_GET],0); 00957 } 00958 sqlite3_reset(_statements[NODE_GET]); 00959 if(err != SQLITE_DONE && err != SQLITE_ROW){ 00960 stringstream error; 00961 error << "SQLiteBundleStorage: getNodeRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database); 00962 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00963 throw SQLiteQueryException(error.str()); 00964 } 00965 00966 return result; 00967 } 00968 00969 std::string SQLiteBundleStorage::getRoutingInfo(const int &key) 00970 { 00971 int err; 00972 string result; 00973 00974 sqlite3_bind_int(_statements[INFO_GET],1,key); 00975 err = sqlite3_step(_statements[INFO_GET]); 00976 if(err == SQLITE_ROW){ 00977 result = (const char*) sqlite3_column_text(_statements[INFO_GET],0); 00978 } 00979 sqlite3_reset(_statements[INFO_GET]); 00980 if(err != SQLITE_DONE && err != SQLITE_ROW){ 00981 stringstream error; 00982 error << "SQLiteBundleStorage: getRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database); 00983 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 00984 throw SQLiteQueryException(error.str()); 00985 } 00986 00987 return result; 00988 } 00989 00990 void SQLiteBundleStorage::storeBundleRoutingInfo(const data::BundleID &bundleID, const int &key, const string &routingInfo) 00991 { 00992 int err; 00993 00994 sqlite3_bind_text(_statements[ROUTING_STORE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT); 00995 sqlite3_bind_int(_statements[ROUTING_STORE],2,key); 00996 sqlite3_bind_text(_statements[ROUTING_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT); 00997 err = sqlite3_step(_statements[ROUTING_STORE]); 00998 sqlite3_reset(_statements[ROUTING_STORE]); 00999 if(err == SQLITE_CONSTRAINT) 01000 cerr << "warning: BundleRoutingInformations for "<<bundleID.toString() <<" are either already in the storage or there is no Bundle with this BundleID."<<endl; 01001 else if(err != SQLITE_DONE){ 01002 stringstream error; 01003 error << "SQLiteBundleStorage: storeBundleRoutingInfo: " << err << " errmsg: " << err; 01004 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01005 throw SQLiteQueryException(error.str()); 01006 } 01007 } 01008 01009 void SQLiteBundleStorage::storeNodeRoutingInfo(const data::EID &node, const int &key, const std::string &routingInfo) 01010 { 01011 int err; 01012 01013 sqlite3_bind_text(_statements[NODE_STORE],1,node.getString().c_str(),node.getString().length(), SQLITE_TRANSIENT); 01014 sqlite3_bind_int(_statements[NODE_STORE],2,key); 01015 sqlite3_bind_text(_statements[NODE_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT); 01016 err = sqlite3_step(_statements[NODE_STORE]); 01017 sqlite3_reset(_statements[NODE_STORE]); 01018 if(err == SQLITE_CONSTRAINT) 01019 cerr << "warning: NodeRoutingInfo for "<<node.getString() <<" are already in the storage."<<endl; 01020 else if(err != SQLITE_DONE){ 01021 stringstream error; 01022 error << "SQLiteBundleStorage: storeNodeRoutingInfo: " << err << " errmsg: " << err; 01023 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01024 throw SQLiteQueryException(error.str()); 01025 } 01026 } 01027 01028 void SQLiteBundleStorage::storeRoutingInfo(const int &key, const std::string &routingInfo) 01029 { 01030 int err; 01031 01032 sqlite3_bind_int(_statements[INFO_STORE],1,key); 01033 sqlite3_bind_text(_statements[INFO_STORE],2,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT); 01034 err = sqlite3_step(_statements[INFO_STORE]); 01035 sqlite3_reset(_statements[INFO_STORE]); 01036 if(err == SQLITE_CONSTRAINT) 01037 cerr << "warning: There are already RoutingInformation refereed by this Key"<<endl; 01038 else if(err != SQLITE_DONE){ 01039 stringstream error; 01040 error << "SQLiteBundleStorage: storeRoutingInfo: " << err << " errmsg: " << err; 01041 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01042 throw SQLiteQueryException(error.str()); 01043 } 01044 } 01045 01046 void SQLiteBundleStorage::removeBundleRoutingInfo(const data::BundleID &bundleID, const int &key) 01047 { 01048 int err; 01049 01050 sqlite3_bind_text(_statements[ROUTING_REMOVE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT); 01051 sqlite3_bind_int(_statements[ROUTING_REMOVE],2,key); 01052 err = sqlite3_step(_statements[ROUTING_REMOVE]); 01053 sqlite3_reset(_statements[ROUTING_REMOVE]); 01054 if(err != SQLITE_DONE){ 01055 stringstream error; 01056 error << "SQLiteBundleStorage: removeBundleRoutingInfo: " << err << " errmsg: " << err; 01057 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01058 throw SQLiteQueryException(error.str()); 01059 } 01060 } 01061 01062 void SQLiteBundleStorage::removeNodeRoutingInfo(const data::EID &eid, const int &key) 01063 { 01064 int err; 01065 01066 sqlite3_bind_text(_statements[NODE_REMOVE],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT); 01067 sqlite3_bind_int(_statements[NODE_REMOVE],2,key); 01068 err = sqlite3_step(_statements[NODE_REMOVE]); 01069 sqlite3_reset(_statements[NODE_REMOVE]); 01070 if(err != SQLITE_DONE){ 01071 stringstream error; 01072 error << "SQLiteBundleStorage: removeNodeRoutingInfo: " << err << " errmsg: " << err; 01073 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01074 throw SQLiteQueryException(error.str()); 01075 } 01076 } 01077 01078 void SQLiteBundleStorage::removeRoutingInfo(const int &key) 01079 { 01080 int err; 01081 01082 sqlite3_bind_int(_statements[INFO_REMOVE],1,key); 01083 err = sqlite3_step(_statements[INFO_REMOVE]); 01084 sqlite3_reset(_statements[INFO_REMOVE]); 01085 if(err != SQLITE_DONE){ 01086 stringstream error; 01087 error << "SQLiteBundleStorage: removeRoutingInfo: " << err << " errmsg: " << err; 01088 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01089 throw SQLiteQueryException(error.str()); 01090 } 01091 } 01092 #endif 01093 01094 void SQLiteBundleStorage::clearAll() 01095 { 01096 #ifdef SQLITE_STORAGE_EXTENDED 01097 sqlite3_step(_statements[ROUTING_CLEAR]); 01098 sqlite3_reset(_statements[ROUTING_CLEAR]); 01099 sqlite3_step(_statements[NODE_CLEAR]); 01100 sqlite3_reset(_statements[NODE_CLEAR]); 01101 #endif 01102 01103 clear(); 01104 } 01105 01106 void SQLiteBundleStorage::clear() 01107 { 01108 AutoResetLock l(_locks[VACUUM], _statements[VACUUM]); 01109 01110 sqlite3_step(_statements[BUNDLE_CLEAR]); 01111 sqlite3_reset(_statements[BUNDLE_CLEAR]); 01112 sqlite3_step(_statements[BLOCK_CLEAR]); 01113 sqlite3_reset(_statements[BLOCK_CLEAR]); 01114 01115 if(SQLITE_DONE != sqlite3_step(_statements[VACUUM])) 01116 { 01117 sqlite3_reset(_statements[VACUUM]); 01118 throw SQLiteQueryException("SQLiteBundleStore: clear(): vacuum failed."); 01119 } 01120 01121 //Delete Folder SQL_TABLE_BLOCK containing Blocks 01122 { 01123 _blockPath.remove(true); 01124 ibrcommon::File::createDirectory(_blockPath); 01125 } 01126 01127 reset_expire_time(); 01128 } 01129 01130 01131 01132 bool SQLiteBundleStorage::empty() 01133 { 01134 AutoResetLock l(_locks[EMPTY_CHECK], _statements[EMPTY_CHECK]); 01135 01136 if (SQLITE_DONE == sqlite3_step(_statements[EMPTY_CHECK])) 01137 { 01138 return true; 01139 } 01140 else 01141 { 01142 return false; 01143 } 01144 } 01145 01146 unsigned int SQLiteBundleStorage::count() 01147 { 01148 int rows = 0; 01149 int err = 0; 01150 01151 AutoResetLock l(_locks[COUNT_ENTRIES], _statements[COUNT_ENTRIES]); 01152 01153 if ((err = sqlite3_step(_statements[COUNT_ENTRIES])) == SQLITE_ROW) 01154 { 01155 rows = sqlite3_column_int(_statements[COUNT_ENTRIES], 0); 01156 } 01157 else 01158 { 01159 stringstream error; 01160 error << "SQLiteBundleStorage: count: failure " << err << " " << sqlite3_errmsg(_database); 01161 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01162 throw SQLiteQueryException(error.str()); 01163 } 01164 01165 return rows; 01166 } 01167 01168 #ifdef SQLITE_STORAGE_EXTENDED 01169 int SQLiteBundleStorage::occupiedSpace() 01170 { 01171 int size = 0; 01172 std::list<ibrcommon::File> files; 01173 01174 if (_blockPath.getFiles(files)) 01175 { 01176 IBRCOMMON_LOGGER(error) << "occupiedSpace: unable to open Directory " << _blockPath.getPath() << IBRCOMMON_LOGGER_ENDL; 01177 return -1; 01178 } 01179 01180 for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++) 01181 { 01182 const ibrcommon::File &f = (*iter); 01183 01184 if (!f.isSystem()) 01185 { 01186 size += f.size(); 01187 } 01188 } 01189 01190 // add db file size 01191 size += dbFile.size(); 01192 01193 return size; 01194 } 01195 01196 void SQLiteBundleStorage::storeFragment(const dtn::data::Bundle &bundle) 01197 { 01198 int err = 0; 01199 size_t bitcounter = 0; 01200 bool allFragmentsReceived(true); 01201 size_t payloadsize, TTL, fragmentoffset; 01202 01203 ibrcommon::File payloadfile, filename; 01204 01205 // generate a bundle id string 01206 std::string bundleID = dtn::data::BundleID(bundle).toString(); 01207 01208 // get the destination id string 01209 std::string destination = bundle._destination.getString(); 01210 01211 // get the source id string 01212 std::string sourceEID = bundle._source.getString(); 01213 01214 // calculate the expiration timestamp 01215 TTL = bundle._timestamp + bundle._lifetime; 01216 01217 const dtn::data::PayloadBlock &block = bundle.getBlock<dtn::data::PayloadBlock>(); 01218 ibrcommon::BLOB::Reference payloadBlob = block.getBLOB(); 01219 01220 // get the payload size 01221 { 01222 ibrcommon::BLOB::iostream io = payloadBlob.iostream(); 01223 payloadsize = io.size(); 01224 } 01225 01226 //Saves the blocks from the first and the last Fragment, they may contain Extentionblock 01227 bool last = bundle._fragmentoffset + payloadsize == bundle._appdatalength; 01228 bool first = bundle._fragmentoffset == 0; 01229 01230 if(first || last) 01231 { 01232 int i = 0, blocknumber = 0; 01233 01234 // get the list of blocks 01235 std::list<const dtn::data::Block*> blocklist = bundle.getBlocks(); 01236 01237 // iterate through all blocks 01238 for (std::list<const dtn::data::Block* >::const_iterator it = blocklist.begin(); it != blocklist.end(); it++, i++) 01239 { 01240 // generate a temporary block file 01241 ibrcommon::TemporaryFile tmpfile(_blockPath, "block"); 01242 01243 ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary); 01244 01245 dtn::data::SeparateSerializer serializer(datei); 01246 serializer << (*(*it)); 01247 datei.close(); 01248 01249 /* Schreibt Blöcke in die DB. 01250 * Die Blocknummern werden nach folgendem Schema in die DB geschrieben: 01251 * Die Blöcke des ersten Fragments fangen bei -x an und gehen bis -1 01252 * Die Blöcke des letzten Fragments beginnen bei 1 und gehen bis x 01253 * Der Payloadblock wird an Stelle 0 eingefügt. 01254 */ 01255 sqlite3_bind_text(_statements[BLOCK_STORE], 1, bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT); 01256 sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType())); 01257 sqlite3_bind_text(_statements[BLOCK_STORE], 3, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT); 01258 01259 blocknumber = blocklist.size() - i; 01260 01261 if (first) 01262 { 01263 blocknumber = (blocklist.size() - i)*(-1); 01264 } 01265 01266 sqlite3_bind_int(_statements[BLOCK_STORE],4,blocknumber); 01267 executeQuery(_statements[BLOCK_STORE]); 01268 } 01269 } 01270 01271 //At first the database is checked for already received bundles and some 01272 //informations are read, which effect the following program flow. 01273 sqlite3_bind_text(_statements[BUNDLE_GET_FRAGMENT],1,sourceEID.c_str(),sourceEID.length(),SQLITE_TRANSIENT); 01274 sqlite3_bind_int64(_statements[BUNDLE_GET_FRAGMENT],2,bundle._timestamp); 01275 sqlite3_bind_int(_statements[BUNDLE_GET_FRAGMENT],3,bundle._sequencenumber); 01276 err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]); 01277 01278 //Es ist noch kein Eintrag vorhanden: generiere Payloadfilename 01279 if(err == SQLITE_DONE) 01280 { 01281 // generate a temporary block file 01282 ibrcommon::TemporaryFile tmpfile(_blockPath, "block"); 01283 payloadfile = tmpfile; 01284 } 01285 //Es ist bereits ein eintrag vorhanden: speicher den payloadfilename 01286 else if (err == SQLITE_ROW) 01287 { 01288 payloadfile = ibrcommon::File( (const char*)sqlite3_column_text(_statements[BUNDLE_GET_FRAGMENT], 14) ); //14 = Payloadfilename 01289 } 01290 01291 while (err == SQLITE_ROW){ 01292 //The following codepice summs the payloadfragmentbytes, to check if all fragments were received 01293 //and the bundle is complete. 01294 fragmentoffset = sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],9); // 9 = fragmentoffset 01295 01296 if(bitcounter >= fragmentoffset){ 01297 bitcounter += fragmentoffset - bitcounter + sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],13); //13 = payloadlength 01298 } 01299 else if(bitcounter >= bundle._fragmentoffset){ 01300 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize; 01301 } 01302 else{ 01303 allFragmentsReceived = false; 01304 } 01305 err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]); 01306 } 01307 01308 if(bitcounter +1 >= bundle._fragmentoffset && allFragmentsReceived) //Wenn das aktuelle fragment das größte ist, ist die Liste durchlaufen und das Frag nicht dazugezählt. Dies wird hier nachgeholt. 01309 { 01310 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize; 01311 } 01312 01313 sqlite3_reset(_statements[BUNDLE_GET_FRAGMENT]); 01314 if(err != SQLITE_DONE){ 01315 stringstream error; 01316 error << "SQLiteBundleStorage: storeFragment: " << err << " errmsg: " << sqlite3_errmsg(_database); 01317 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01318 throw SQLiteQueryException(error.str()); 01319 } 01320 01321 //Save the Payload in a separate file 01322 std::fstream datei(payloadfile.getPath().c_str(), std::ios::in | std::ios::out | std::ios::binary); 01323 datei.seekp(bundle._fragmentoffset); 01324 { 01325 ibrcommon::BLOB::iostream io = payloadBlob.iostream(); 01326 01327 size_t ret = 0; 01328 istream &s = (*io); 01329 s.seekg(0); 01330 01331 while (true) 01332 { 01333 char buf; 01334 s.get(buf); 01335 if(!s.eof()){ 01336 datei.put(buf); 01337 ret++; 01338 } 01339 else 01340 break; 01341 } 01342 datei.close(); 01343 } 01344 01345 //All fragments received 01346 if(bundle._appdatalength == bitcounter) 01347 { 01348 /* 01349 * 1. Payloadblock zusammensetzen 01350 * 2. PayloadBlock in der BlockTable sichern 01351 * 3. Primary BlockInfos in die BundleTable eintragen. 01352 * 4. Einträge der Fragmenttabelle löschen 01353 */ 01354 01355 // 1. Get the payloadblockobject 01356 { 01357 ibrcommon::BLOB::Reference ref(ibrcommon::FileBLOB::create(payloadfile)); 01358 dtn::data::PayloadBlock pb(ref); 01359 01360 //Copy EID list 01361 if(pb.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS)) 01362 { 01363 std::list<dtn::data::EID> eidlist; 01364 std::list<dtn::data::EID>::iterator eidIt; 01365 eidlist = block.getEIDList(); 01366 01367 for(eidIt = eidlist.begin(); eidIt != eidlist.end(); eidIt++) 01368 { 01369 pb.addEID(*eidIt); 01370 } 01371 } 01372 01373 //Copy ProcFlags 01374 pb.set(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT, block.get(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT)); 01375 pb.set(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED)); 01376 pb.set(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED)); 01377 pb.set(dtn::data::PayloadBlock::LAST_BLOCK, block.get(dtn::data::PayloadBlock::LAST_BLOCK)); 01378 pb.set(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED)); 01379 pb.set(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED, block.get(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED)); 01380 pb.set(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS, block.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS)); 01381 01382 //2. Save the PayloadBlock to a file and store the metainformation in the BlockTable 01383 ibrcommon::TemporaryFile tmpfile(_blockPath, "block"); 01384 01385 std::ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary); 01386 dtn::data::SeparateSerializer serializer(datei); 01387 serializer << (pb); 01388 datei.close(); 01389 01390 filename = tmpfile; 01391 } 01392 01393 sqlite3_bind_text(_statements[BLOCK_STORE], 1,bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT); 01394 // sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType())); 01395 sqlite3_bind_text(_statements[BLOCK_STORE], 3, filename.getPath().c_str(), filename.getPath().length(), SQLITE_TRANSIENT); 01396 sqlite3_bind_int(_statements[BLOCK_STORE], 4, 0); 01397 executeQuery(_statements[BLOCK_STORE]); 01398 01399 //3. Delete reassembled Payload data 01400 payloadfile.remove(); 01401 01402 //4. Remove Fragment entries from BundleTable 01403 sqlite3_bind_text(_statements[BUNDLE_DELETE],1,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT); 01404 sqlite3_bind_int64(_statements[BUNDLE_DELETE],2,bundle._sequencenumber); 01405 sqlite3_bind_int64(_statements[BUNDLE_DELETE],3,bundle._timestamp); 01406 executeQuery(_statements[BUNDLE_DELETE]); 01407 01408 //5. Write the Primary Block to the BundleTable 01409 size_t procFlags = bundle._procflags; 01410 procFlags &= ~(dtn::data::PrimaryBlock::FRAGMENT); 01411 01412 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT); 01413 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT); 01414 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT); 01415 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT); 01416 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT); 01417 sqlite3_bind_int(_statements[BUNDLE_STORE],6,procFlags); 01418 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp); 01419 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber); 01420 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime); 01421 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset); 01422 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength); 01423 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL); 01424 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL); 01425 sqlite3_bind_int(_statements[BUNDLE_STORE],14,NULL); 01426 sqlite3_bind_text(_statements[BUNDLE_STORE],15,NULL,0,SQLITE_TRANSIENT); 01427 executeQuery(_statements[BUNDLE_STORE]); 01428 } 01429 01430 //There are some fragments missing 01431 else{ 01432 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT); 01433 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT); 01434 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT); 01435 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT); 01436 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT); 01437 sqlite3_bind_int(_statements[BUNDLE_STORE],6,bundle._procflags); 01438 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp); 01439 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber); 01440 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime); 01441 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset); 01442 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength); 01443 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL); 01444 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL); 01445 sqlite3_bind_int(_statements[BUNDLE_STORE],14,payloadsize); 01446 sqlite3_bind_text(_statements[BUNDLE_STORE],15,payloadfile.getPath().c_str(),payloadfile.getPath().length(),SQLITE_TRANSIENT); 01447 executeQuery(_statements[BUNDLE_STORE]); 01448 } 01449 } 01450 #endif 01451 01452 void SQLiteBundleStorage::raiseEvent(const Event *evt) 01453 { 01454 try { 01455 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt); 01456 01457 if (time.getAction() == dtn::core::TIME_SECOND_TICK) 01458 { 01459 _tasks.push(new TaskExpire(time.getTimestamp())); 01460 } 01461 } catch (const std::bad_cast&) { } 01462 01463 try { 01464 const GlobalEvent &global = dynamic_cast<const GlobalEvent&>(*evt); 01465 01466 if(global.getAction() == GlobalEvent::GLOBAL_IDLE) 01467 { 01468 // switch to idle mode 01469 ibrcommon::MutexLock l(TaskIdle::_mutex); 01470 TaskIdle::_idle = true; 01471 01472 // generate an idle task 01473 _tasks.push(new TaskIdle()); 01474 } 01475 else if(global.getAction() == GlobalEvent::GLOBAL_BUSY) 01476 { 01477 // switch back to non-idle mode 01478 ibrcommon::MutexLock l(TaskIdle::_mutex); 01479 TaskIdle::_idle = false; 01480 } 01481 } catch (const std::bad_cast&) { } 01482 } 01483 01484 void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage) 01485 { 01486 /* 01487 * Only if the actual time is bigger or equal than the time when the next bundle expires, deleteexpired is called. 01488 */ 01489 size_t exp_time = storage.get_expire_time(); 01490 if ((_timestamp < exp_time) || (exp_time == 0)) return; 01491 01492 /* 01493 * Performanceverbesserung: Damit die Abfragen nicht jede Sekunde ausgeführt werden müssen, speichert man den Zeitpunkt an dem 01494 * das nächste Bündel gelöscht werden soll in eine Variable und führt deleteexpired erst dann aus wenn ein Bündel abgelaufen ist. 01495 * Nach dem Löschen wird die DB durchsucht und der nächste Ablaufzeitpunkt wird in die Variable gesetzt. 01496 */ 01497 01498 { 01499 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_FILENAMES], storage._statements[EXPIRE_BUNDLE_FILENAMES]); 01500 01501 // query for blocks of expired bundles 01502 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_FILENAMES], 1, _timestamp); 01503 while (sqlite3_step(storage._statements[EXPIRE_BUNDLE_FILENAMES]) == SQLITE_ROW) 01504 { 01505 ibrcommon::File block((const char*)sqlite3_column_text(storage._statements[EXPIRE_BUNDLE_FILENAMES],0)); 01506 block.remove(); 01507 } 01508 } 01509 01510 { 01511 AutoResetLock l(storage._locks[EXPIRE_BUNDLES], storage._statements[EXPIRE_BUNDLES]); 01512 01513 // query expired bundles 01514 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLES], 1, _timestamp); 01515 while (sqlite3_step(storage._statements[EXPIRE_BUNDLES]) == SQLITE_ROW) 01516 { 01517 dtn::data::BundleID id; 01518 storage.get_bundleid(storage._statements[EXPIRE_BUNDLES], id); 01519 dtn::core::BundleExpiredEvent::raise(id); 01520 } 01521 } 01522 01523 { 01524 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_DELETE], storage._statements[EXPIRE_BUNDLE_DELETE]); 01525 01526 // delete all expired db entries (bundles and blocks) 01527 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_DELETE], 1, _timestamp); 01528 sqlite3_step(storage._statements[EXPIRE_BUNDLE_DELETE]); 01529 } 01530 01531 //update deprecated timer 01532 storage.update_expire_time(); 01533 } 01534 01535 void SQLiteBundleStorage::update_expire_time() 01536 { 01537 ibrcommon::MutexLock l(_locks[EXPIRE_NEXT_TIMESTAMP]); 01538 int err = sqlite3_step(_statements[EXPIRE_NEXT_TIMESTAMP]); 01539 01540 if (err == SQLITE_ROW) 01541 { 01542 ibrcommon::MutexLock l(_next_expiration_lock); 01543 _next_expiration = sqlite3_column_int64(_statements[EXPIRE_NEXT_TIMESTAMP], 0); 01544 } 01545 else 01546 { 01547 ibrcommon::MutexLock l(_next_expiration_lock); 01548 _next_expiration = 0; 01549 } 01550 01551 sqlite3_reset(_statements[EXPIRE_NEXT_TIMESTAMP]); 01552 } 01553 01554 void SQLiteBundleStorage::update_custodian(const dtn::data::BundleID &id, const dtn::data::EID &custodian) 01555 { 01556 // select query with or without fragmentation extension 01557 STORAGE_STMT query = BUNDLE_UPDATE_CUSTODIAN; 01558 if (id.fragment) query = FRAGMENT_UPDATE_CUSTODIAN; 01559 01560 01561 AutoResetLock l(_locks[query], _statements[query]); 01562 01563 sqlite3_bind_text(_statements[query], 1, custodian.getString().c_str(), custodian.getString().length(), SQLITE_TRANSIENT); 01564 set_bundleid(_statements[query], id, 1); 01565 01566 // update the custodian in the database 01567 int err = sqlite3_step(_statements[query]); 01568 01569 if (err != SQLITE_DONE) 01570 { 01571 stringstream error; 01572 error << "SQLiteBundleStorage: update_custodian() failure: " << err << " " << sqlite3_errmsg(_database); 01573 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL; 01574 } 01575 } 01576 01577 int SQLiteBundleStorage::store_blocks(const data::Bundle &bundle) 01578 { 01579 int blocktyp, blocknumber(1), storedBytes(0); 01580 01581 // get all blocks of the bundle 01582 const list<const dtn::data::Block*> blocklist = bundle.getBlocks(); 01583 01584 // generate a bundle id 01585 dtn::data::BundleID id(bundle); 01586 01587 for(std::list<const dtn::data::Block*>::const_iterator it = blocklist.begin() ;it != blocklist.end(); it++) 01588 { 01589 const dtn::data::Block &block = (**it); 01590 blocktyp = (int)block.getType(); 01591 01592 ibrcommon::TemporaryFile tmpfile(_blockPath, "block"); 01593 01594 try { 01595 try { 01596 const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(block); 01597 ibrcommon::BLOB::Reference ref = payload.getBLOB(); 01598 ibrcommon::BLOB::iostream stream = ref.iostream(); 01599 01600 const SQLiteBLOB &blob = dynamic_cast<const SQLiteBLOB&>(*ref.getPointer()); 01601 01602 // first remove the tmp file 01603 tmpfile.remove(); 01604 01605 // make a hardlink to the origin blob file 01606 if ( ::link(blob._file.getPath().c_str(), tmpfile.getPath().c_str()) != 0 ) 01607 { 01608 tmpfile = ibrcommon::TemporaryFile(_blockPath, "block"); 01609 throw ibrcommon::Exception("hard-link failed"); 01610 } 01611 } catch (const std::bad_cast&) { 01612 throw ibrcommon::Exception("not a Payload or SQLiteBLOB"); 01613 } 01614 01615 storedBytes += _blockPath.size(); 01616 } catch (const ibrcommon::Exception&) { 01617 std::ofstream filestream(tmpfile.getPath().c_str(), std::ios_base::out | std::ios::binary); 01618 dtn::data::SeparateSerializer serializer(filestream); 01619 serializer << block; 01620 storedBytes += serializer.getLength(block); 01621 filestream.close(); 01622 } 01623 01624 // protect this query from concurrent access and enable the auto-reset feature 01625 AutoResetLock l(_locks[BLOCK_STORE], _statements[BLOCK_STORE]); 01626 01627 // set bundle key data 01628 set_bundleid(_statements[BLOCK_STORE], id); 01629 01630 // set the column four to null if this is not a fragment 01631 if (!id.fragment) sqlite3_bind_null(_statements[BLOCK_STORE], 4); 01632 01633 // set the block type 01634 sqlite3_bind_int(_statements[BLOCK_STORE], 5, blocktyp); 01635 01636 // the filename of the block data 01637 sqlite3_bind_text(_statements[BLOCK_STORE], 6, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT); 01638 01639 // the ordering number 01640 sqlite3_bind_int(_statements[BLOCK_STORE], 7, blocknumber); 01641 01642 // execute the query and store the block in the database 01643 if (sqlite3_step(_statements[BLOCK_STORE]) != SQLITE_DONE) 01644 { 01645 throw SQLiteQueryException("can not store block of bundle"); 01646 } 01647 01648 //increment blocknumber 01649 blocknumber++; 01650 } 01651 01652 return storedBytes; 01653 } 01654 01655 void SQLiteBundleStorage::get_blocks(dtn::data::Bundle &bundle, const dtn::data::BundleID &id) 01656 { 01657 int err = 0; 01658 string file; 01659 01660 // select the right statement to use 01661 const size_t stmt_key = id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID; 01662 01663 AutoResetLock l(_locks[stmt_key], _statements[stmt_key]); 01664 01665 // set the bundle key values 01666 set_bundleid(_statements[stmt_key], id); 01667 01668 // query the database and step through all blocks 01669 while ((err = sqlite3_step(_statements[stmt_key])) == SQLITE_ROW) 01670 { 01671 const ibrcommon::File f( (const char*) sqlite3_column_text(_statements[stmt_key], 0) ); 01672 int blocktyp = sqlite3_column_int(_statements[stmt_key], 1); 01673 01674 // open the file 01675 std::ifstream is(f.getPath().c_str(), std::ios::binary | std::ios::in); 01676 01677 if (blocktyp == dtn::data::PayloadBlock::BLOCK_TYPE) 01678 { 01679 // create a new BLOB object 01680 SQLiteBLOB *blob = new SQLiteBLOB(_blobPath); 01681 01682 // remove the corresponding file 01683 blob->_file.remove(); 01684 01685 // generate a hardlink, pointing to the BLOB file 01686 if ( ::link(f.getPath().c_str(), blob->_file.getPath().c_str()) == 0) 01687 { 01688 // create a reference of the BLOB 01689 ibrcommon::BLOB::Reference ref(blob); 01690 01691 // add payload block to the bundle 01692 bundle.push_back(ref); 01693 } 01694 else 01695 { 01696 delete blob; 01697 IBRCOMMON_LOGGER(error) << "unable to load bundle: failed to create a hard-link" << IBRCOMMON_LOGGER_ENDL; 01698 } 01699 } 01700 else 01701 { 01702 // read the block 01703 dtn::data::Block &block = dtn::data::SeparateDeserializer(is, bundle).readBlock(); 01704 01705 // close the file 01706 is.close(); 01707 01708 // modify the age block if present 01709 try { 01710 dtn::data::AgeBlock &agebl = dynamic_cast<dtn::data::AgeBlock&>(block); 01711 01712 // modify the AgeBlock with the age of the file 01713 time_t age = f.lastaccess() - f.lastmodify(); 01714 01715 agebl.addSeconds(age); 01716 } catch (const std::bad_cast&) { }; 01717 } 01718 } 01719 01720 if (err == SQLITE_DONE) 01721 { 01722 if (bundle.getBlocks().size() == 0) 01723 { 01724 IBRCOMMON_LOGGER(error) << "get_blocks: no blocks found for: " << id.toString() << IBRCOMMON_LOGGER_ENDL; 01725 throw SQLiteQueryException("no blocks found"); 01726 } 01727 } 01728 else 01729 { 01730 IBRCOMMON_LOGGER(error) << "get_blocks() failure: "<< err << " " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL; 01731 throw SQLiteQueryException("can not query for blocks"); 01732 } 01733 } 01734 01735 void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage) 01736 { 01737 // until IDLE is false 01738 while (true) 01739 { 01740 /* 01741 * When an object (table, index, trigger, or view) is dropped from the database, it leaves behind empty space. 01742 * This empty space will be reused the next time new information is added to the database. But in the meantime, 01743 * the database file might be larger than strictly necessary. Also, frequent inserts, updates, and deletes can 01744 * cause the information in the database to become fragmented - scrattered out all across the database file rather 01745 * than clustered together in one place. 01746 * The VACUUM command cleans the main database. This eliminates free pages, aligns table data to be contiguous, 01747 * and otherwise cleans up the database file structure. 01748 */ 01749 { 01750 AutoResetLock l(storage._locks[VACUUM], storage._statements[VACUUM]); 01751 sqlite3_step(storage._statements[VACUUM]); 01752 } 01753 01754 // here we can do some IDLE stuff... 01755 ::sleep(1); 01756 01757 ibrcommon::MutexLock l(TaskIdle::_mutex); 01758 if (!TaskIdle::_idle) return; 01759 } 01760 } 01761 01762 const std::string SQLiteBundleStorage::getName() const 01763 { 01764 return "SQLiteBundleStorage"; 01765 } 01766 01767 void SQLiteBundleStorage::releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id) 01768 { 01769 // custody is successful transferred to another node. 01770 // it is safe to delete this bundle now. (depending on the routing algorithm.) 01771 // update the custodian of this bundle with the new one 01772 update_custodian(id, custodian); 01773 } 01774 01775 void SQLiteBundleStorage::new_expire_time(size_t ttl) 01776 { 01777 ibrcommon::MutexLock l(_next_expiration_lock); 01778 if (_next_expiration == 0 || ttl < _next_expiration) 01779 { 01780 _next_expiration = ttl; 01781 } 01782 } 01783 01784 void SQLiteBundleStorage::reset_expire_time() 01785 { 01786 ibrcommon::MutexLock l(_next_expiration_lock); 01787 _next_expiration = 0; 01788 } 01789 01790 size_t SQLiteBundleStorage::get_expire_time() 01791 { 01792 ibrcommon::MutexLock l(_next_expiration_lock); 01793 return _next_expiration; 01794 } 01795 01796 void SQLiteBundleStorage::add_deletion(const dtn::data::BundleID &id) 01797 { 01798 ibrcommon::MutexLock l(_deletion_mutex); 01799 _deletion_list.insert(id); 01800 } 01801 01802 void SQLiteBundleStorage::remove_deletion(const dtn::data::BundleID &id) 01803 { 01804 ibrcommon::MutexLock l(_deletion_mutex); 01805 _deletion_list.erase(id); 01806 } 01807 01808 bool SQLiteBundleStorage::contains_deletion(const dtn::data::BundleID &id) 01809 { 01810 ibrcommon::MutexLock l(_deletion_mutex); 01811 return (_deletion_list.find(id) != _deletion_list.end()); 01812 } 01813 01814 void SQLiteBundleStorage::trylock() throw (ibrcommon::MutexException) 01815 { 01816 // not implemented 01817 throw ibrcommon::MutexException(); 01818 } 01819 01820 void SQLiteBundleStorage::enter() throw (ibrcommon::MutexException) 01821 { 01822 // lock all statements 01823 for (int i = 0; i < SQL_QUERIES_END; i++) 01824 { 01825 // lock the statement 01826 _locks[i].enter(); 01827 } 01828 } 01829 01830 void SQLiteBundleStorage::leave() throw (ibrcommon::MutexException) 01831 { 01832 // unlock all statements 01833 for (int i = 0; i < SQL_QUERIES_END; i++) 01834 { 01835 // unlock the statement 01836 _locks[i].leave(); 01837 } 01838 } 01839 01840 void SQLiteBundleStorage::set_bundleid(sqlite3_stmt *st, const dtn::data::BundleID &id, size_t offset) const 01841 { 01842 const std::string source_id = id.source.getString(); 01843 sqlite3_bind_text(st, offset + 1, source_id.c_str(), source_id.length(), SQLITE_TRANSIENT); 01844 sqlite3_bind_int64(st, offset + 2, id.timestamp); 01845 sqlite3_bind_int64(st, offset + 3, id.sequencenumber); 01846 01847 if (id.fragment) 01848 { 01849 sqlite3_bind_int64(st, offset + 4, id.offset); 01850 } 01851 } 01852 01853 void SQLiteBundleStorage::get_bundleid(sqlite3_stmt *st, dtn::data::BundleID &id, size_t offset) const 01854 { 01855 id.source = dtn::data::EID((const char*)sqlite3_column_text(st, offset + 0)); 01856 id.timestamp = sqlite3_column_int64(st, offset + 1); 01857 id.sequencenumber = sqlite3_column_int64(st, offset + 2); 01858 01859 id.fragment = (sqlite3_column_text(st, offset + 3) != NULL); 01860 id.offset = sqlite3_column_int64(st, offset + 3); 01861 } 01862 } 01863 }