• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/core/SQLiteBundleStorage.cpp

Go to the documentation of this file.
00001 /*
00002  * SQLiteBundleStorage.cpp
00003  *
00004  *  Created on: 09.01.2010
00005  *      Author: Myrtus
00006  */
00007 
00008 
00009 #include "core/SQLiteBundleStorage.h"
00010 #include "core/TimeEvent.h"
00011 #include "core/GlobalEvent.h"
00012 #include "core/BundleCore.h"
00013 #include "core/SQLiteConfigure.h"
00014 #include "core/BundleEvent.h"
00015 #include "core/BundleExpiredEvent.h"
00016 
00017 #include <ibrdtn/data/PayloadBlock.h>
00018 #include <ibrdtn/data/Serializer.h>
00019 #include <ibrdtn/data/Bundle.h>
00020 #include <ibrdtn/data/BundleID.h>
00021 
00022 #include <ibrcommon/thread/MutexLock.h>
00023 #include <ibrcommon/data/BLOB.h>
00024 #include <ibrcommon/AutoDelete.h>
00025 #include <ibrcommon/Logger.h>
00026 
00027 namespace dtn
00028 {
00029         namespace core
00030         {
00031                 void sql_tracer(void*, const char * pQuery)
00032                 {
00033                         IBRCOMMON_LOGGER_DEBUG(50) << "sqlite trace: " << pQuery << IBRCOMMON_LOGGER_ENDL;
00034                 }
00035 
00036                 const std::string SQLiteBundleStorage::_tables[SQL_TABLE_END] =
00037                                 { "bundles", "blocks", "routing", "routing_bundles", "routing_nodes" };
00038 
00039                 const std::string SQLiteBundleStorage::_sql_queries[SQL_QUERIES_END] =
00040                 {
00041                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM " + _tables[SQL_TABLE_BUNDLE] + " ORDER BY priority DESC LIMIT ?,?;",
00042                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL LIMIT 1;",
00043                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? LIMIT 1;",
00044                         "SELECT * FROM " + _tables[SQL_TABLE_BUNDLE] + " WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset != NULL ORDER BY fragmentoffset ASC;",
00045 
00046                         "SELECT source, timestamp, sequencenumber, fragmentoffset, procflags FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
00047                         "SELECT filename FROM "+ _tables[SQL_TABLE_BUNDLE] +" as a, "+ _tables[SQL_TABLE_BLOCK] +" as b WHERE a.source_id = b.source_id AND a.timestamp = b.timestamp AND a.sequencenumber = b.sequencenumber AND a.fragmentoffset = b.fragmentoffset AND a.expiretime <= ?;",
00048                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
00049                         "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY expiretime ASC LIMIT 1;",
00050 
00051                         "SELECT ROWID FROM "+ _tables[SQL_TABLE_BUNDLE] +" LIMIT 1;",
00052                         "SELECT COUNT(ROWID) FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
00053 
00054                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;",
00055                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00056                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
00057                         "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +" (source_id, timestamp, sequencenumber, fragmentoffset, source, destination, reportto, custodian, procflags, lifetime, appdatalength, expiretime, priority) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);",
00058                         "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;",
00059                         "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00060 
00061                         "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET procflags = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00062 
00063                         "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL ORDER BY ordernumber ASC;",
00064                         "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? ORDER BY ordernumber ASC;",
00065                         "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL AND ordernumber = ?;",
00066                         "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? AND ordernumber = ?;",
00067                         "DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +";",
00068                         "INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +" (source_id, timestamp, sequencenumber, fragmentoffset, blocktype, filename, ordernumber) VALUES (?,?,?,?,?,?,?);",
00069 
00070 #ifdef SQLITE_STORAGE_EXTENDED
00071                         "SELECT Routing FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;",
00072                         "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;",
00073                         "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +";",
00074                         "INSERT INTO "+ _tables[SQL_TABLE_ROUTING] +" (Key, Routing) VALUES (?,?);",
00075 
00076                         "SELECT Routing FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;",
00077                         "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;",
00078                         "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +";",
00079                         "INSERT INTO "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (EID, Key, Routing) VALUES (?,?,?);",
00080 
00081                         "SELECT Routing FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;",
00082                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;",
00083                         "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID, Key, Routing) VALUES (?,?,?);",
00084 #endif
00085 
00086                         "VACUUM;"
00087                 };
00088 
00089 //              const std::string SQLiteBundleStorage::_db_structure[11] =
00090 //              {
00091 //                      "create table if not exists "+ _tables[SQL_TABLE_BUNDLE] +" (Key INTEGER PRIMARY KEY ASC, BundleID text, Source text, Destination text, Reportto text, Custodian text, ProcFlags int, Timestamp int, Sequencenumber int, Lifetime int, Fragmentoffset int, Appdatalength int, TTL int, Size int, Payloadlength int, Payloadfilename text);",
00092 //                      "create table if not exists "+ _tables[SQL_TABLE_BLOCK] +" (Key INTEGER PRIMARY KEY ASC, BundleID text, BlockType int, Filename text, Blocknumber int);",
00093 //                      "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);",
00094 //                      "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);",
00095 //                      "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);",
00096 //                      "CREATE TRIGGER IF NOT EXISTS Blockdelete AFTER DELETE ON "+ _tables[SQL_TABLE_BUNDLE] +" FOR EACH ROW BEGIN DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE BundleID = OLD.BundleID; DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = OLD.BundleID; END;",
00097 //                      "CREATE TRIGGER IF NOT EXISTS BundleRoutingdelete BEFORE INSERT ON "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" BEGIN SELECT CASE WHEN (SELECT BundleID FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE BundleID = NEW.BundleID) IS NULL THEN RAISE(ABORT, \"Foreign Key Violation: BundleID doesn't exist in BundleTable\")END; END;",
00098 //                      "CREATE INDEX IF NOT EXISTS BlockIDIndex ON "+ _tables[SQL_TABLE_BLOCK] +" (BundleID);",
00099 //                      "CREATE UNIQUE INDEX IF NOT EXISTS BundleIDIndex ON "+ _tables[SQL_TABLE_BUNDLE] +" (BundleID);",
00100 //                      "CREATE UNIQUE INDEX IF NOT EXISTS RoutingKeyIndex ON "+ _tables[SQL_TABLE_ROUTING] +" (Key);",
00101 //                      "CREATE UNIQUE INDEX IF NOT EXISTS BundleRoutingIDIndex ON "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID,Key);",
00102 //              };
00103 
00104                 const std::string SQLiteBundleStorage::_db_structure[10] =
00105                 {
00106                         "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BLOCK] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `blocktype` INTEGER NOT NULL, `filename` TEXT NOT NULL, `ordernumber` INTEGER NOT NULL);",
00107                         "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BUNDLE] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `source` TEXT NOT NULL, `destination` TEXT NOT NULL, `reportto` TEXT NOT NULL, `custodian` TEXT NOT NULL, `procflags` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `lifetime` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `appdatalength` INTEGER DEFAULT NULL, `expiretime` INTEGER NOT NULL, `priority` INTEGER NOT NULL);",
00108                         "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);",
00109                         "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);",
00110                         "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);",
00111                         "CREATE TRIGGER IF NOT EXISTS blocks_autodelete AFTER DELETE ON " + _tables[SQL_TABLE_BUNDLE] + " FOR EACH ROW BEGIN DELETE FROM " + _tables[SQL_TABLE_BLOCK] + " WHERE " + _tables[SQL_TABLE_BLOCK] + ".source_id = OLD.source_id AND " + _tables[SQL_TABLE_BLOCK] + ".timestamp = OLD.timestamp AND " + _tables[SQL_TABLE_BLOCK] + ".sequencenumber = OLD.sequencenumber AND ((" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset IS NULL AND old.fragmentoffset IS NULL) OR (" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset = old.fragmentoffset)); END;",
00112                         "CREATE UNIQUE INDEX IF NOT EXISTS blocks_bid ON " + _tables[SQL_TABLE_BLOCK] + " (source_id, timestamp, sequencenumber, fragmentoffset);",
00113                         "CREATE INDEX IF NOT EXISTS bundles_destination ON " + _tables[SQL_TABLE_BUNDLE] + " (destination);",
00114                         "CREATE INDEX IF NOT EXISTS bundles_destination_priority ON " + _tables[SQL_TABLE_BUNDLE] + " (destination, priority);",
00115                         "CREATE UNIQUE INDEX IF NOT EXISTS bundles_id ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset);"
00116                         "CREATE INDEX IF NOT EXISTS bundles_expire ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset, expiretime);"
00117                 };
00118 
00119                 ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex;
00120                 bool SQLiteBundleStorage::TaskIdle::_idle = false;
00121 
00122                 SQLiteBundleStorage::SQLBundleQuery::SQLBundleQuery()
00123                  : _statement(NULL)
00124                 { }
00125 
00126                 SQLiteBundleStorage::SQLBundleQuery::~SQLBundleQuery()
00127                 {
00128                         // free the statement if used before
00129                         if (_statement != NULL)
00130                         {
00131                                 sqlite3_finalize(_statement);
00132                         }
00133                 }
00134 
00135                 SQLiteBundleStorage::AutoResetLock::AutoResetLock(ibrcommon::Mutex &mutex, sqlite3_stmt *st)
00136                  : _lock(mutex), _st(st)
00137                 {
00138                         //sqlite3_clear_bindings(st);
00139                 }
00140 
00141                 SQLiteBundleStorage::AutoResetLock::~AutoResetLock()
00142                 {
00143                         sqlite3_reset(_st);
00144                 }
00145 
00146                 SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const size_t &size)
00147                  : dbPath(path), dbFile(path.get("sqlite.db")), dbSize(size), _next_expiration(0)
00148                 {
00149                         //Configure SQLite Library
00150                         SQLiteConfigure::configure();
00151 
00152                         // check if SQLite is thread-safe
00153                         if (sqlite3_threadsafe() == 0)
00154                         {
00155                                 IBRCOMMON_LOGGER(critical) << "sqlite library has not compiled with threading support." << IBRCOMMON_LOGGER_ENDL;
00156                                 throw ibrcommon::Exception("need threading support in sqlite!");
00157                         }
00158                 }
00159 
00160                 SQLiteBundleStorage::~SQLiteBundleStorage()
00161                 {
00162                         stop();
00163                         join();
00164 
00165                         ibrcommon::MutexLock l(*this);
00166 
00167                         // free all statements
00168                         for (int i = 0; i < SQL_QUERIES_END; i++)
00169                         {
00170                                 // prepare the statement
00171                                 sqlite3_finalize(_statements[i]);
00172                         }
00173 
00174                         //close Databaseconnection
00175                         if (sqlite3_close(_database) != SQLITE_OK)
00176                         {
00177                                 IBRCOMMON_LOGGER(error) << "unable to close database" << IBRCOMMON_LOGGER_ENDL;
00178                         }
00179 
00180                         // shutdown sqlite library
00181                         SQLiteConfigure::shutdown();
00182                 }
00183 
00184                 void SQLiteBundleStorage::openDatabase(const ibrcommon::File &path)
00185                 {
00186                         ibrcommon::MutexLock l(*this);
00187 
00188                         // set the block path
00189                         _blockPath = dbPath.get(_tables[SQL_TABLE_BLOCK]);
00190 
00191                         //open the database
00192                         if (sqlite3_open_v2(path.getPath().c_str(), &_database,  SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL))
00193                         {
00194                                 IBRCOMMON_LOGGER(error) << "Can't open database: " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
00195                                 sqlite3_close(_database);
00196                                 throw ibrcommon::Exception("Unable to open sqlite database");
00197                         }
00198 
00199                         int err = 0;
00200 
00201                         // create all tables
00202                         for (size_t i = 0; i < 10; i++)
00203                         {
00204                                 sqlite3_stmt *st = prepare(_db_structure[i]);
00205                                 err = sqlite3_step(st);
00206                                 if(err != SQLITE_DONE)
00207                                 {
00208                                         IBRCOMMON_LOGGER(error) << "SQLiteBundleStorage: failed to create database" << err << IBRCOMMON_LOGGER_ENDL;
00209                                 }
00210                                 sqlite3_reset(st);
00211                                 sqlite3_finalize(st);
00212                         }
00213 
00214                         // create the bundle folder
00215                         ibrcommon::File::createDirectory( _blockPath );
00216 
00217                         // prepare all statements
00218                         for (int i = 0; i < SQL_QUERIES_END; i++)
00219                         {
00220                                 // prepare the statement
00221                                 int err = sqlite3_prepare_v2(_database, _sql_queries[i].c_str(), _sql_queries[i].length(), &_statements[i], 0);
00222                                 if ( err != SQLITE_OK )
00223                                 {
00224                                         IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failed to prepare statement: " << err << " with query: " << _sql_queries[i] << IBRCOMMON_LOGGER_ENDL;
00225                                 }
00226                         }
00227 
00228                         // disable synchronous mode
00229                         sqlite3_exec(_database, "PRAGMA synchronous = OFF;", NULL, NULL, NULL);
00230 
00231                         // enable sqlite tracing if debug level is higher than 50
00232                         if (IBRCOMMON_LOGGER_LEVEL >= 50)
00233                         {
00234                                 sqlite3_trace(_database, &sql_tracer, NULL);
00235                         }
00236                 }
00237 
00238                 void SQLiteBundleStorage::componentRun()
00239                 {
00240                         // loop until aborted
00241                         try {
00242                                 while (true)
00243                                 {
00244                                         Task *t = _tasks.getnpop(true);
00245 
00246                                         try {
00247                                                 BlockingTask &btask = dynamic_cast<BlockingTask&>(*t);
00248                                                 try {
00249                                                         btask.run(*this);
00250                                                 } catch (const std::exception&) {
00251                                                         btask.abort();
00252                                                         continue;
00253                                                 };
00254                                                 btask.done();
00255                                                 continue;
00256                                         } catch (const std::bad_cast&) { };
00257 
00258                                         try {
00259                                                 ibrcommon::AutoDelete<Task> killer(t);
00260                                                 t->run(*this);
00261                                         } catch (const std::exception&) { };
00262                                 }
00263                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00264                                 // we are aborted, abort all blocking tasks
00265                         }
00266                 }
00267 
00268                 void SQLiteBundleStorage::componentUp()
00269                 {
00270                         //register Events
00271                         bindEvent(TimeEvent::className);
00272                         bindEvent(GlobalEvent::className);
00273 
00274                         // open the database and create all folders and files if needed
00275                         openDatabase(dbFile);
00276 
00277                         //Check if the database is consistent with the filesystem
00278                         check_consistency();
00279 
00280                         // calculate next Bundleexpiredtime
00281                         update_expire_time();
00282                 };
00283 
00284                 void SQLiteBundleStorage::componentDown()
00285                 {
00286                         //unregister Events
00287                         unbindEvent(TimeEvent::className);
00288                         unbindEvent(GlobalEvent::className);
00289                 };
00290 
00291                 bool SQLiteBundleStorage::__cancellation()
00292                 {
00293                         _tasks.abort();
00294                         return true;
00295                 }
00296 
00297                 void SQLiteBundleStorage::check_consistency()
00298                 {
00299                         /*
00300                          * check if the database is consistent with the files on the HDD. Therefore every file of the database is put in a list.
00301                          * When the files of the database are scanned it is checked if the actual file is in the list of files of the filesystem. if it is so the entry
00302                          * of the list of files of the filesystem is deleted. if not the file is put in a seperate list. After this procedure there are 2 lists containing file
00303                          * which are inconsistent and can be deleted.
00304                          */
00305                         int err;
00306                         size_t timestamp, sequencenumber, offset, pFlags, appdata, lifetime;
00307                         set<string> blockFiles,payloadfiles;
00308                         string datei;
00309 
00310                         list<ibrcommon::File> blist;
00311                         list<ibrcommon::File>::iterator file_it;
00312 
00313                         //1. Durchsuche die Dateien
00314                         _blockPath.getFiles(blist);
00315                         for(file_it = blist.begin(); file_it != blist.end(); file_it++)
00316                         {
00317                                 datei = (*file_it).getPath();
00318                                 if(datei[datei.size()-1] != '.')
00319                                 {
00320                                         if(((*file_it).getBasename())[0] == 'b')
00321                                         {
00322                                                 blockFiles.insert(datei);
00323                                         }
00324                                         else
00325                                                 payloadfiles.insert(datei);
00326                                 }
00327                         }
00328 
00329                         //3. Check consistency of the bundles
00330                         check_bundles(blockFiles);
00331 
00332                         //4. Check consistency of the fragments
00333                         check_fragments(payloadfiles);
00334                 }
00335 
00336                 void SQLiteBundleStorage::check_fragments(std::set<std::string> &payloadFiles)
00337                 {
00338                         int err;
00339                         size_t procFlags, timestamp, sequencenumber, appdatalength, lifetime;
00340                         string filename, source, dest, custody, repto;
00341                         set<string> consistenFiles, inconsistenSources;
00342                         set<size_t> inconsistentTimestamp, inconsistentSeq_number;
00343                         set<string>::iterator file_it, consisten_it;
00344 
00345                         sqlite3_stmt *getPayloadfiles = prepare("SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+_tables[SQL_TABLE_BUNDLE]+" WHERE fragmentoffset != NULL;");
00346 
00347                         for(err = sqlite3_step(getPayloadfiles); err == SQLITE_ROW; err = sqlite3_step(getPayloadfiles))
00348                         {
00349                                 filename = (const char*)sqlite3_column_text(getPayloadfiles,10);
00350                                 file_it = payloadFiles.find(filename);
00351 
00352 
00353                                 if (file_it == payloadFiles.end())
00354                                 {
00355                                         consisten_it = consistenFiles.find(*file_it);
00356 
00357                                         //inconsistent DB entry
00358                                         if(consisten_it == consistenFiles.end()){
00359                                                 //Generate Report
00360                                                 source  = (const char*) sqlite3_column_text(getPayloadfiles,0);
00361                                                 dest    = (const char*) sqlite3_column_text(getPayloadfiles,1);
00362                                                 repto   = (const char*) sqlite3_column_text(getPayloadfiles,2);
00363                                                 custody = (const char*) sqlite3_column_text(getPayloadfiles,3);
00364                                                 procFlags = sqlite3_column_int64(getPayloadfiles,4);
00365                                                 timestamp = sqlite3_column_int64(getPayloadfiles,5);
00366                                                 sequencenumber = sqlite3_column_int64(getPayloadfiles,6);
00367                                                 lifetime  = sqlite3_column_int64(getPayloadfiles,7);
00368                                                 appdatalength  = sqlite3_column_int64(getPayloadfiles,9);
00369 
00370                                                 dtn::data::BundleID id(dtn::data::EID(source),timestamp,sequencenumber);
00371                                                 dtn::data::MetaBundle mb(id, lifetime, dtn::data::DTNTime(), dtn::data::EID(dest), dtn::data::EID(repto), dtn::data::EID(custody), appdatalength, procFlags);
00372                                                 dtn::core::BundleEvent::raise(mb, BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE);
00373                                         }
00374                                 }
00375                                 //consistent DB entry
00376                                 else
00377                                         consistenFiles.insert(*file_it);
00378                                         payloadFiles.erase(file_it);
00379                         }
00380 
00381                         sqlite3_reset(getPayloadfiles);
00382                         sqlite3_finalize(getPayloadfiles);
00383                 }
00384 
00385                 void SQLiteBundleStorage::check_bundles(std::set<std::string> &blockFiles)
00386                 {
00387                         std::set<dtn::data::BundleID> corrupt_bundle_ids;
00388 
00389                         sqlite3_stmt *blockConistencyCheck = prepare("SELECT source_id, timestamp, sequencenumber, fragmentoffset, filename, ordernumber FROM "+ _tables[SQL_TABLE_BLOCK] +";");
00390 
00391                         while (sqlite3_step(blockConistencyCheck) == SQLITE_ROW)
00392                         {
00393                                 dtn::data::BundleID id;
00394 
00395                                 // get the bundleid
00396                                 get_bundleid(blockConistencyCheck, id);
00397 
00398                                 // get the filename
00399                                 std::string filename = (const char*)sqlite3_column_text(blockConistencyCheck, 4);
00400 
00401                                 // if the filename is not in the list of block files
00402                                 if (blockFiles.find(filename) == blockFiles.end())
00403                                 {
00404                                         // add the bundle to the deletion list
00405                                         corrupt_bundle_ids.insert(id);
00406                                 }
00407                                 else
00408                                 {
00409                                         // file is available, everything is fine
00410                                         blockFiles.erase(filename);
00411                                 }
00412                         }
00413                         sqlite3_reset(blockConistencyCheck);
00414                         sqlite3_finalize(blockConistencyCheck);
00415 
00416                         for(std::set<dtn::data::BundleID>::const_iterator iter = corrupt_bundle_ids.begin(); iter != corrupt_bundle_ids.end(); iter++)
00417                         {
00418                                 const dtn::data::BundleID &id = (*iter);
00419 
00420                                 // get meta data of this bundle
00421                                 const dtn::data::MetaBundle m = get_meta(id);
00422 
00423                                 // remove the hole bundle
00424                                 remove(id);
00425                         }
00426 
00427                         // delete block files still in the blockfile list
00428                         for (std::set<std::string>::const_iterator iter = blockFiles.begin(); iter != blockFiles.end(); iter++)
00429                         {
00430                                 ibrcommon::File blockfile(*iter);
00431                                 blockfile.remove();
00432                         }
00433                 }
00434 
00435 
00436                 sqlite3_stmt* SQLiteBundleStorage::prepare(const std::string &sqlquery)
00437                 {
00438                         // prepare the statement
00439                         sqlite3_stmt *statement;
00440                         int err = sqlite3_prepare_v2(_database, sqlquery.c_str(), sqlquery.length(), &statement, 0);
00441                         if ( err != SQLITE_OK )
00442                         {
00443                                 IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failure in prepareStatement: " << err << " with Query: " << sqlquery << IBRCOMMON_LOGGER_ENDL;
00444                         }
00445                         return statement;
00446                 }
00447 
00448                 dtn::data::MetaBundle SQLiteBundleStorage::get_meta(const dtn::data::BundleID &id)
00449                 {
00450                         dtn::data::MetaBundle bundle;
00451 
00452                         // check if the bundle is already on the deletion list
00453                         if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException();
00454 
00455                         size_t stmt_key = BUNDLE_GET_ID;
00456                         if (id.fragment) stmt_key = FRAGMENT_GET_ID;
00457 
00458                         // lock the prepared statement
00459                         AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
00460 
00461                         // bind bundle id to the statement
00462                         set_bundleid(_statements[stmt_key], id);
00463 
00464                         // execute the query and check for error
00465                         if (sqlite3_step(_statements[stmt_key]) != SQLITE_ROW)
00466                         {
00467                                 stringstream error;
00468                                 error << "SQLiteBundleStorage: No Bundle found with BundleID: " << id.toString();
00469                                 IBRCOMMON_LOGGER_DEBUG(15) << error.str() << IBRCOMMON_LOGGER_ENDL;
00470                                 throw SQLiteQueryException(error.str());
00471                         }
00472 
00473                         // query bundle data
00474                         get(_statements[stmt_key], bundle);
00475 
00476                         return bundle;
00477                 }
00478 
00479                 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::MetaBundle &bundle, size_t offset)
00480                 {
00481                         bundle.source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) );
00482                         bundle.destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) );
00483                         bundle.reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) );
00484                         bundle.custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) );
00485                         bundle.procflags = sqlite3_column_int(st, offset + 4);
00486                         bundle.timestamp = sqlite3_column_int64(st, offset + 5);
00487                         bundle.sequencenumber = sqlite3_column_int64(st, offset + 6);
00488                         bundle.lifetime = sqlite3_column_int64(st, offset + 7);
00489 
00490                         if (bundle.procflags & data::Bundle::FRAGMENT)
00491                         {
00492                                 bundle.offset = sqlite3_column_int64(st, offset + 8);
00493                                 bundle.appdatalength = sqlite3_column_int64(st, offset + 9);
00494                         }
00495                 }
00496 
00497                 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::Bundle &bundle, size_t offset)
00498                 {
00499                         bundle._source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) );
00500                         bundle._destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) );
00501                         bundle._reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) );
00502                         bundle._custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) );
00503                         bundle._procflags = sqlite3_column_int(st, offset + 4);
00504                         bundle._timestamp = sqlite3_column_int64(st, offset + 5);
00505                         bundle._sequencenumber = sqlite3_column_int64(st, offset + 6);
00506                         bundle._lifetime = sqlite3_column_int64(st, offset + 7);
00507 
00508                         if (bundle._procflags & data::Bundle::FRAGMENT)
00509                         {
00510                                 bundle._fragmentoffset = sqlite3_column_int64(st, offset + 8);
00511                                 bundle._appdatalength = sqlite3_column_int64(st, offset + 9);
00512                         }
00513                 }
00514 
00515                 const std::list<dtn::data::MetaBundle> SQLiteBundleStorage::get(BundleFilterCallback &cb)
00516                 {
00517                         std::list<dtn::data::MetaBundle> ret;
00518 
00519                         const std::string base_query =
00520                                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM " + _tables[SQL_TABLE_BUNDLE];
00521 
00522                         sqlite3_stmt *st = NULL;
00523                         size_t bind_offset = 1;
00524 
00525                         try {
00526                                 SQLBundleQuery &query = dynamic_cast<SQLBundleQuery&>(cb);
00527 
00528                                 if (query._statement == NULL)
00529                                 {
00530                                         std::string sql = base_query + " WHERE " + query.getWhere() + " ORDER BY priority DESC LIMIT ?,?;";
00531                                         query._statement = prepare(sql);
00532                                 }
00533 
00534                                 st = query._statement;
00535                                 bind_offset = query.bind(st, 1);
00536                         } catch (const std::bad_cast&) {
00537                                 // this query is not optimized for sql, use the generic way (slow)
00538                                 st = _statements[BUNDLE_GET_FILTER];
00539                         };
00540 
00541                         int offset = 0;
00542                         while (ret.size() != cb.limit())
00543                         {
00544                                 // lock the database
00545                                 AutoResetLock l(_locks[BUNDLE_GET_FILTER], st);
00546 
00547                                 sqlite3_bind_int64(st, bind_offset, offset);
00548                                 sqlite3_bind_int64(st, bind_offset + 1, cb.limit());
00549 
00550                                 if (sqlite3_step(st) == SQLITE_DONE)
00551                                 {
00552                                         // returned no result
00553                                         break;
00554                                 }
00555 
00556                                 while (ret.size() != cb.limit())
00557                                 {
00558                                         dtn::data::MetaBundle m;
00559 
00560                                         // extract the primary values and set them in the bundle object
00561                                         get(st, m, 0);
00562 
00563                                         // check if the bundle is already on the deletion list
00564                                         if (!contains_deletion(m))
00565                                         {
00566                                                 // ask the filter if this bundle should be added to the return list
00567                                                 if (cb.shouldAdd(m))
00568                                                 {
00569                                                         IBRCOMMON_LOGGER_DEBUG(40) << "add bundle to query selection list: " << m.toString() << IBRCOMMON_LOGGER_ENDL;
00570 
00571                                                         // add the bundle to the list
00572                                                         ret.push_back(m);
00573                                                 }
00574                                         }
00575 
00576                                         if (sqlite3_step(st) != SQLITE_ROW)
00577                                         {
00578                                                 break;
00579                                         }
00580                                 }
00581 
00582                                 // increment the offset, because we might not have enough
00583                                 offset += cb.limit();
00584                         }
00585 
00586                         return ret;
00587                 }
00588 
00589                 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id)
00590                 {
00591                         dtn::data::Bundle bundle;
00592                         int err = 0;
00593 
00594                         IBRCOMMON_LOGGER_DEBUG(25) << "get bundle from sqlite storage " << id.toString() << IBRCOMMON_LOGGER_ENDL;
00595 
00596                         // if bundle is deleted?
00597                         if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException();
00598 
00599                         size_t stmt_key = BUNDLE_GET_ID;
00600                         if (id.fragment) stmt_key = FRAGMENT_GET_ID;
00601 
00602                         // do this while db is locked
00603                         AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
00604 
00605                         // set the bundle key values
00606                         set_bundleid(_statements[stmt_key], id);
00607 
00608                         // execute the query and check for error
00609                         if ((err = sqlite3_step(_statements[stmt_key])) != SQLITE_ROW)
00610                         {
00611                                 IBRCOMMON_LOGGER_DEBUG(15) << "sql error: " << err << "; No bundle found with id: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
00612                                 throw dtn::core::BundleStorage::NoBundleFoundException();
00613                         }
00614 
00615                         // read bundle data
00616                         get(_statements[stmt_key], bundle);
00617 
00618                         try {
00619                                 // read all blocks
00620                                 get_blocks(bundle, id);
00621                         } catch (const ibrcommon::Exception &ex) {
00622                                 IBRCOMMON_LOGGER(error) << "could not get bundle blocks: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00623                                 throw dtn::core::BundleStorage::NoBundleFoundException();
00624                         }
00625 
00626                         return bundle;
00627                 }
00628 
00629 #ifdef SQLITE_STORAGE_EXTENDED
00630                 void SQLiteBundleStorage::setPriority(const int priority, const dtn::data::BundleID &id)
00631                 {
00632                         int err;
00633                         size_t procflags;
00634                         {
00635                                 sqlite3_bind_text(_statements[PROCFLAGS_GET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT);
00636                                 err = sqlite3_step(_statements[PROCFLAGS_GET]);
00637                                 if(err == SQLITE_ROW)
00638                                         procflags = sqlite3_column_int(_statements[PROCFLAGS_GET],0);
00639                                 sqlite3_reset(_statements[PROCFLAGS_GET]);
00640                                 if(err != SQLITE_DONE){
00641                                         stringstream error;
00642                                         error << "SQLiteBundleStorage: error while Select Querry: " << err;
00643                                         IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00644                                         throw SQLiteQueryException(error.str());
00645                                 }
00646 
00647                                 //Priority auf 0 Setzen
00648                                 procflags -= ( 128*(procflags & dtn::data::Bundle::PRIORITY_BIT1) + 256*(procflags & dtn::data::Bundle::PRIORITY_BIT2));
00649 
00650                                 //Set Prioritybits
00651                                 switch(priority){
00652                                 case 1: procflags += data::Bundle::PRIORITY_BIT1; break; //Set PRIORITY_BIT1
00653                                 case 2: procflags += data::Bundle::PRIORITY_BIT2; break;
00654                                 case 3: procflags += (data::Bundle::PRIORITY_BIT1 + data::Bundle::PRIORITY_BIT2); break;
00655                                 }
00656 
00657                                 sqlite3_bind_text(_statements[PROCFLAGS_SET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT);
00658                                 sqlite3_bind_int64(_statements[PROCFLAGS_SET],2,priority);
00659                                 sqlite3_step(_statements[PROCFLAGS_SET]);
00660                                 sqlite3_reset(_statements[PROCFLAGS_SET]);
00661 
00662                                 if(err != SQLITE_DONE)
00663                                 {
00664                                         stringstream error;
00665                                         error << "SQLiteBundleStorage: setPriority error while Select Querry: " << err;
00666                                         IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00667                                         throw SQLiteQueryException(error.str());
00668                                 }
00669                         }
00670                 }
00671 #endif
00672 
00673                 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle)
00674                 {
00675                         IBRCOMMON_LOGGER_DEBUG(25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00676 
00677                         // determine if the bundle is for local delivery
00678                         bool local = (bundle._destination.sameHost(BundleCore::local)
00679                                         && (bundle._procflags & dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON));
00680 
00681 //                      // if the bundle is a fragment store it as one
00682 //                      if (bundle.get(dtn::data::Bundle::FRAGMENT) && local)
00683 //                      {
00684 //                              storeFragment(bundle);
00685 //                              return;
00686 //                      }
00687 
00688                         int err;
00689 
00690                         const dtn::data::EID _sourceid = bundle._source;
00691                         size_t TTL = bundle._timestamp + bundle._lifetime;
00692 
00693                         AutoResetLock l(_locks[BUNDLE_STORE], _statements[BUNDLE_STORE]);
00694 
00695                         set_bundleid(_statements[BUNDLE_STORE], bundle);
00696 
00697                         sqlite3_bind_text(_statements[BUNDLE_STORE], 5, bundle._source.getString().c_str(), bundle._source.getString().length(), SQLITE_TRANSIENT);
00698                         sqlite3_bind_text(_statements[BUNDLE_STORE], 6, bundle._destination.getString().c_str(), bundle._destination.getString().length(), SQLITE_TRANSIENT);
00699                         sqlite3_bind_text(_statements[BUNDLE_STORE], 7, bundle._reportto.getString().c_str(), bundle._reportto.getString().length(), SQLITE_TRANSIENT);
00700                         sqlite3_bind_text(_statements[BUNDLE_STORE], 8, bundle._custodian.getString().c_str(), bundle._custodian.getString().length(), SQLITE_TRANSIENT);
00701                         sqlite3_bind_int(_statements[BUNDLE_STORE], 9, bundle._procflags);
00702                         sqlite3_bind_int64(_statements[BUNDLE_STORE], 10, bundle._lifetime);
00703 
00704                         if (bundle.get(dtn::data::Bundle::FRAGMENT))
00705                         {
00706                                 sqlite3_bind_int64(_statements[BUNDLE_STORE], 11, bundle._appdatalength);
00707                         }
00708                         else
00709                         {
00710                                 sqlite3_bind_null(_statements[BUNDLE_STORE], 4);
00711                                 sqlite3_bind_null(_statements[BUNDLE_STORE], 11);
00712                         }
00713 
00714                         sqlite3_bind_int64(_statements[BUNDLE_STORE], 12, TTL);
00715                         sqlite3_bind_int64(_statements[BUNDLE_STORE], 13, dtn::data::MetaBundle(bundle).getPriority());
00716 
00717                         // start a transaction
00718                         sqlite3_exec(_database, "BEGIN TRANSACTION;", NULL, NULL, NULL);
00719 
00720                         // store the bundle data in the database
00721                         err = sqlite3_step(_statements[BUNDLE_STORE]);
00722 
00723                         if (err == SQLITE_CONSTRAINT)
00724                         {
00725                                 IBRCOMMON_LOGGER(warning) << "Bundle is already in the storage" << IBRCOMMON_LOGGER_ENDL;
00726                         }
00727                         else if (err != SQLITE_DONE)
00728                         {
00729                                 stringstream error;
00730                                 error << "SQLiteBundleStorage: store() failure: " << err << " " <<  sqlite3_errmsg(_database);
00731                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00732 
00733                                 // rollback the whole transaction
00734                                 sqlite3_exec(_database, "ROLLBACK TRANSACTION;", NULL, NULL, NULL);
00735 
00736                                 throw SQLiteQueryException(error.str());
00737                         }
00738                         else
00739                         {
00740                                 // stores the blocks to a file
00741                                 store_blocks(bundle);
00742 
00743                                 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL;
00744                         }
00745 
00746                         // commit the transaction
00747                         sqlite3_exec(_database, "END TRANSACTION;", NULL, NULL, NULL);
00748 
00749                         // set new expire time
00750                         new_expire_time(TTL);
00751 
00752                         try {
00753                                 // the bundle is stored sucessfully, we could accept custody if it is requested
00754                                 const dtn::data::EID custodian = acceptCustody(bundle);
00755 
00756                                 // update the custody address of this bundle
00757                                 update_custodian(bundle, custodian);
00758                         } catch (const ibrcommon::Exception&) {
00759                                 // this bundle has no request for custody transfers
00760                         }
00761                 }
00762 
00763                 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id)
00764                 {
00765                         add_deletion(id);
00766                         _tasks.push(new TaskRemove(id));
00767                 }
00768 
00769                 void SQLiteBundleStorage::TaskRemove::run(SQLiteBundleStorage &storage)
00770                 {
00771                         {
00772                                 // select the right statement to use
00773                                 const size_t stmt_key = _id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID;
00774 
00775                                 // lock the database
00776                                 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]);
00777 
00778                                 // set the bundle key values
00779                                 storage.set_bundleid(storage._statements[stmt_key], _id);
00780 
00781                                 // step through all blocks
00782                                 while (sqlite3_step(storage._statements[stmt_key]) == SQLITE_ROW)
00783                                 {
00784                                         // delete each referenced block file
00785                                         ibrcommon::File blockfile( (const char*)sqlite3_column_text(storage._statements[stmt_key], 0) );
00786                                         blockfile.remove();
00787                                 }
00788                         }
00789 
00790                         {
00791                                 const size_t stmt_key = _id.fragment ? FRAGMENT_DELETE : BUNDLE_DELETE;
00792 
00793                                 // lock the database
00794                                 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]);
00795 
00796                                 // then remove the bundle data
00797                                 storage.set_bundleid(storage._statements[stmt_key], _id);
00798                                 sqlite3_step(storage._statements[stmt_key]);
00799 
00800                                 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << _id.toString() << " deleted" << IBRCOMMON_LOGGER_ENDL;
00801                         }
00802 
00803                         //update deprecated timer
00804                         storage.update_expire_time();
00805 
00806                         // remove it from the deletion list
00807                         storage.remove_deletion(_id);
00808                 }
00809 
00810 #ifdef SQLITE_STORAGE_EXTENDED
00811                 std::string SQLiteBundleStorage::getBundleRoutingInfo(const data::BundleID &bundleID, const int &key)
00812                 {
00813                         int err;
00814                         string result;
00815 
00816                         sqlite3_bind_text(_statements[ROUTING_GET],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00817                         sqlite3_bind_int(_statements[ROUTING_GET],2,key);
00818                         err = sqlite3_step(_statements[ROUTING_GET]);
00819                         if(err == SQLITE_ROW){
00820                                 result = (const char*) sqlite3_column_text(_statements[ROUTING_GET],0);
00821                         }
00822                         sqlite3_reset(_statements[ROUTING_GET]);
00823                         if(err != SQLITE_DONE && err != SQLITE_ROW){
00824                                 stringstream error;
00825                                 error << "SQLiteBundleStorage: getBundleRoutingInfo: " << err << " errmsg: " << err;
00826                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00827                                 throw SQLiteQueryException(error.str());
00828                         }
00829 
00830                         return result;
00831                 }
00832 
00833                 std::string SQLiteBundleStorage::getNodeRoutingInfo(const data::EID &eid, const int &key)
00834                 {
00835                         int err;
00836                         string result;
00837 
00838                         err = sqlite3_bind_text(_statements[NODE_GET],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT);
00839                         err = sqlite3_bind_int(_statements[NODE_GET],2,key);
00840                         err = sqlite3_step(_statements[NODE_GET]);
00841                         if(err == SQLITE_ROW){
00842                                 result = (const char*) sqlite3_column_text(_statements[NODE_GET],0);
00843                         }
00844                         sqlite3_reset(_statements[NODE_GET]);
00845                         if(err != SQLITE_DONE && err != SQLITE_ROW){
00846                                 stringstream error;
00847                                 error << "SQLiteBundleStorage: getNodeRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database);
00848                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00849                                 throw SQLiteQueryException(error.str());
00850                         }
00851 
00852                         return result;
00853                 }
00854 
00855                 std::string SQLiteBundleStorage::getRoutingInfo(const int &key)
00856                 {
00857                         int err;
00858                         string result;
00859 
00860                         sqlite3_bind_int(_statements[INFO_GET],1,key);
00861                         err = sqlite3_step(_statements[INFO_GET]);
00862                         if(err == SQLITE_ROW){
00863                                 result = (const char*) sqlite3_column_text(_statements[INFO_GET],0);
00864                         }
00865                         sqlite3_reset(_statements[INFO_GET]);
00866                         if(err != SQLITE_DONE && err != SQLITE_ROW){
00867                                 stringstream error;
00868                                 error << "SQLiteBundleStorage: getRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database);
00869                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00870                                 throw SQLiteQueryException(error.str());
00871                         }
00872 
00873                         return result;
00874                 }
00875 
00876                 void SQLiteBundleStorage::storeBundleRoutingInfo(const data::BundleID &bundleID, const int &key, const string &routingInfo)
00877                 {
00878                         int err;
00879 
00880                         sqlite3_bind_text(_statements[ROUTING_STORE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00881                         sqlite3_bind_int(_statements[ROUTING_STORE],2,key);
00882                         sqlite3_bind_text(_statements[ROUTING_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00883                         err = sqlite3_step(_statements[ROUTING_STORE]);
00884                         sqlite3_reset(_statements[ROUTING_STORE]);
00885                         if(err == SQLITE_CONSTRAINT)
00886                                 cerr << "warning: BundleRoutingInformations for "<<bundleID.toString() <<" are either already in the storage or there is no Bundle with this BundleID."<<endl;
00887                         else if(err != SQLITE_DONE){
00888                                 stringstream error;
00889                                 error << "SQLiteBundleStorage: storeBundleRoutingInfo: " << err << " errmsg: " << err;
00890                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00891                                 throw SQLiteQueryException(error.str());
00892                         }
00893                 }
00894 
00895                 void SQLiteBundleStorage::storeNodeRoutingInfo(const data::EID &node, const int &key, const std::string &routingInfo)
00896                 {
00897                         int err;
00898 
00899                         sqlite3_bind_text(_statements[NODE_STORE],1,node.getString().c_str(),node.getString().length(), SQLITE_TRANSIENT);
00900                         sqlite3_bind_int(_statements[NODE_STORE],2,key);
00901                         sqlite3_bind_text(_statements[NODE_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00902                         err = sqlite3_step(_statements[NODE_STORE]);
00903                         sqlite3_reset(_statements[NODE_STORE]);
00904                         if(err == SQLITE_CONSTRAINT)
00905                                 cerr << "warning: NodeRoutingInfo for "<<node.getString() <<" are already in the storage."<<endl;
00906                         else if(err != SQLITE_DONE){
00907                                 stringstream error;
00908                                 error << "SQLiteBundleStorage: storeNodeRoutingInfo: " << err << " errmsg: " << err;
00909                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00910                                 throw SQLiteQueryException(error.str());
00911                         }
00912                 }
00913 
00914                 void SQLiteBundleStorage::storeRoutingInfo(const int &key, const std::string &routingInfo)
00915                 {
00916                         int err;
00917 
00918                         sqlite3_bind_int(_statements[INFO_STORE],1,key);
00919                         sqlite3_bind_text(_statements[INFO_STORE],2,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00920                         err = sqlite3_step(_statements[INFO_STORE]);
00921                         sqlite3_reset(_statements[INFO_STORE]);
00922                         if(err == SQLITE_CONSTRAINT)
00923                                 cerr << "warning: There are already RoutingInformation refereed by this Key"<<endl;
00924                         else if(err != SQLITE_DONE){
00925                                 stringstream error;
00926                                 error << "SQLiteBundleStorage: storeRoutingInfo: " << err << " errmsg: " << err;
00927                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00928                                 throw SQLiteQueryException(error.str());
00929                         }
00930                 }
00931 
00932                 void SQLiteBundleStorage::removeBundleRoutingInfo(const data::BundleID &bundleID, const int &key)
00933                 {
00934                         int err;
00935 
00936                         sqlite3_bind_text(_statements[ROUTING_REMOVE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00937                         sqlite3_bind_int(_statements[ROUTING_REMOVE],2,key);
00938                         err = sqlite3_step(_statements[ROUTING_REMOVE]);
00939                         sqlite3_reset(_statements[ROUTING_REMOVE]);
00940                         if(err != SQLITE_DONE){
00941                                 stringstream error;
00942                                 error << "SQLiteBundleStorage: removeBundleRoutingInfo: " << err << " errmsg: " << err;
00943                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00944                                 throw SQLiteQueryException(error.str());
00945                         }
00946                 }
00947 
00948                 void SQLiteBundleStorage::removeNodeRoutingInfo(const data::EID &eid, const int &key)
00949                 {
00950                         int err;
00951 
00952                         sqlite3_bind_text(_statements[NODE_REMOVE],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT);
00953                         sqlite3_bind_int(_statements[NODE_REMOVE],2,key);
00954                         err = sqlite3_step(_statements[NODE_REMOVE]);
00955                         sqlite3_reset(_statements[NODE_REMOVE]);
00956                         if(err != SQLITE_DONE){
00957                                 stringstream error;
00958                                 error << "SQLiteBundleStorage: removeNodeRoutingInfo: " << err << " errmsg: " << err;
00959                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00960                                 throw SQLiteQueryException(error.str());
00961                         }
00962                 }
00963 
00964                 void SQLiteBundleStorage::removeRoutingInfo(const int &key)
00965                 {
00966                         int err;
00967 
00968                         sqlite3_bind_int(_statements[INFO_REMOVE],1,key);
00969                         err = sqlite3_step(_statements[INFO_REMOVE]);
00970                         sqlite3_reset(_statements[INFO_REMOVE]);
00971                         if(err != SQLITE_DONE){
00972                                 stringstream error;
00973                                 error << "SQLiteBundleStorage: removeRoutingInfo: " << err << " errmsg: " << err;
00974                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00975                                 throw SQLiteQueryException(error.str());
00976                         }
00977                 }
00978 #endif
00979 
00980                 void SQLiteBundleStorage::clearAll()
00981                 {
00982 #ifdef SQLITE_STORAGE_EXTENDED
00983                         sqlite3_step(_statements[ROUTING_CLEAR]);
00984                         sqlite3_reset(_statements[ROUTING_CLEAR]);
00985                         sqlite3_step(_statements[NODE_CLEAR]);
00986                         sqlite3_reset(_statements[NODE_CLEAR]);
00987 #endif
00988 
00989                         clear();
00990                 }
00991 
00992                 void SQLiteBundleStorage::clear()
00993                 {
00994                         AutoResetLock l(_locks[VACUUM], _statements[VACUUM]);
00995 
00996                         sqlite3_step(_statements[BUNDLE_CLEAR]);
00997                         sqlite3_reset(_statements[BUNDLE_CLEAR]);
00998                         sqlite3_step(_statements[BLOCK_CLEAR]);
00999                         sqlite3_reset(_statements[BLOCK_CLEAR]);
01000 
01001                         if(SQLITE_DONE != sqlite3_step(_statements[VACUUM]))
01002                         {
01003                                 sqlite3_reset(_statements[VACUUM]);
01004                                 throw SQLiteQueryException("SQLiteBundleStore: clear(): vacuum failed.");
01005                         }
01006 
01007                         //Delete Folder SQL_TABLE_BLOCK containing Blocks
01008                         {
01009                                 _blockPath.remove(true);
01010                                 ibrcommon::File::createDirectory(_blockPath);
01011                         }
01012 
01013                         reset_expire_time();
01014                 }
01015 
01016 
01017 
01018                 bool SQLiteBundleStorage::empty()
01019                 {
01020                         AutoResetLock l(_locks[EMPTY_CHECK], _statements[EMPTY_CHECK]);
01021 
01022                         if (SQLITE_DONE == sqlite3_step(_statements[EMPTY_CHECK]))
01023                         {
01024                                 return true;
01025                         }
01026                         else
01027                         {
01028                                 return false;
01029                         }
01030                 }
01031 
01032                 unsigned int SQLiteBundleStorage::count()
01033                 {
01034                         int rows = 0;
01035                         int err = 0;
01036 
01037                         AutoResetLock l(_locks[COUNT_ENTRIES], _statements[COUNT_ENTRIES]);
01038 
01039                         if ((err = sqlite3_step(_statements[COUNT_ENTRIES])) == SQLITE_ROW)
01040                         {
01041                                 rows = sqlite3_column_int(_statements[COUNT_ENTRIES], 0);
01042                         }
01043                         else
01044                         {
01045                                 stringstream error;
01046                                 error << "SQLiteBundleStorage: count: failure " << err << " " << sqlite3_errmsg(_database);
01047                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01048                                 throw SQLiteQueryException(error.str());
01049                         }
01050 
01051                         return rows;
01052                 }
01053 
01054 #ifdef SQLITE_STORAGE_EXTENDED
01055                 int SQLiteBundleStorage::occupiedSpace()
01056                 {
01057                         int size = 0;
01058                         std::list<ibrcommon::File> files;
01059 
01060                         if (_blockPath.getFiles(files))
01061                         {
01062                                 IBRCOMMON_LOGGER(error) << "occupiedSpace: unable to open Directory " << _blockPath.getPath() << IBRCOMMON_LOGGER_ENDL;
01063                                 return -1;
01064                         }
01065 
01066                         for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++)
01067                         {
01068                                 const ibrcommon::File &f = (*iter);
01069 
01070                                 if (!f.isSystem())
01071                                 {
01072                                         size += f.size();
01073                                 }
01074                         }
01075 
01076                         // add db file size
01077                         size += dbFile.size();
01078 
01079                         return size;
01080                 }
01081 
01082                 void SQLiteBundleStorage::storeFragment(const dtn::data::Bundle &bundle)
01083                 {
01084                         int err = 0;
01085                         size_t bitcounter = 0;
01086                         bool allFragmentsReceived(true);
01087                         size_t payloadsize, TTL, fragmentoffset;
01088 
01089                         ibrcommon::File payloadfile, filename;
01090 
01091                         // generate a bundle id string
01092                         std::string bundleID = dtn::data::BundleID(bundle).toString();
01093 
01094                         // get the destination id string
01095                         std::string destination = bundle._destination.getString();
01096 
01097                         // get the source id string
01098                         std::string sourceEID = bundle._source.getString();
01099 
01100                         // calculate the expiration timestamp
01101                         TTL = bundle._timestamp + bundle._lifetime;
01102 
01103                         const dtn::data::PayloadBlock &block = bundle.getBlock<dtn::data::PayloadBlock>();
01104                         ibrcommon::BLOB::Reference payloadBlob = block.getBLOB();
01105 
01106                         // get the payload size
01107                         {
01108                                 ibrcommon::BLOB::iostream io = payloadBlob.iostream();
01109                                 payloadsize = io.size();
01110                         }
01111 
01112                         //Saves the blocks from the first and the last Fragment, they may contain Extentionblock
01113                         bool last = bundle._fragmentoffset + payloadsize == bundle._appdatalength;
01114                         bool first = bundle._fragmentoffset == 0;
01115 
01116                         if(first || last)
01117                         {
01118                                 int i = 0, blocknumber = 0;
01119 
01120                                 // get the list of blocks
01121                                 std::list<const dtn::data::Block*> blocklist = bundle.getBlocks();
01122 
01123                                 // iterate through all blocks
01124                                 for (std::list<const dtn::data::Block* >::const_iterator it = blocklist.begin(); it != blocklist.end(); it++, i++)
01125                                 {
01126                                         // generate a temporary block file
01127                                         ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01128 
01129                                         ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary);
01130 
01131                                         dtn::data::SeparateSerializer serializer(datei);
01132                                         serializer << (*(*it));
01133                                         datei.close();
01134 
01135                                         /* Schreibt Blöcke in die DB.
01136                                          * Die Blocknummern werden nach folgendem Schema in die DB geschrieben:
01137                                          *              Die Blöcke des ersten Fragments fangen bei -x an und gehen bis -1
01138                                          *              Die Blöcke des letzten Fragments beginnen bei 1 und gehen bis x
01139                                          *              Der Payloadblock wird an Stelle 0 eingefügt.
01140                                          */
01141                                         sqlite3_bind_text(_statements[BLOCK_STORE], 1, bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT);
01142                                         sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType()));
01143                                         sqlite3_bind_text(_statements[BLOCK_STORE], 3, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT);
01144 
01145                                         blocknumber = blocklist.size() - i;
01146 
01147                                         if (first)
01148                                         {
01149                                                 blocknumber = (blocklist.size() - i)*(-1);
01150                                         }
01151 
01152                                         sqlite3_bind_int(_statements[BLOCK_STORE],4,blocknumber);
01153                                         executeQuery(_statements[BLOCK_STORE]);
01154                                 }
01155                         }
01156 
01157                         //At first the database is checked for already received bundles and some
01158                         //informations are read, which effect the following program flow.
01159                         sqlite3_bind_text(_statements[BUNDLE_GET_FRAGMENT],1,sourceEID.c_str(),sourceEID.length(),SQLITE_TRANSIENT);
01160                         sqlite3_bind_int64(_statements[BUNDLE_GET_FRAGMENT],2,bundle._timestamp);
01161                         sqlite3_bind_int(_statements[BUNDLE_GET_FRAGMENT],3,bundle._sequencenumber);
01162                         err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]);
01163 
01164                         //Es ist noch kein Eintrag vorhanden: generiere Payloadfilename
01165                         if(err == SQLITE_DONE)
01166                         {
01167                                 // generate a temporary block file
01168                                 ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01169                                 payloadfile = tmpfile;
01170                         }
01171                         //Es ist bereits ein eintrag vorhanden: speicher den payloadfilename
01172                         else if (err == SQLITE_ROW)
01173                         {
01174                                 payloadfile = ibrcommon::File( (const char*)sqlite3_column_text(_statements[BUNDLE_GET_FRAGMENT], 14) );                                //14 = Payloadfilename
01175                         }
01176 
01177                         while (err == SQLITE_ROW){
01178                                 //The following codepice summs the payloadfragmentbytes, to check if all fragments were received
01179                                 //and the bundle is complete.
01180                                 fragmentoffset = sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],9);                                                                // 9 = fragmentoffset
01181 
01182                                 if(bitcounter >= fragmentoffset){
01183                                         bitcounter += fragmentoffset - bitcounter + sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],13);    //13 = payloadlength
01184                                 }
01185                                 else if(bitcounter >= bundle._fragmentoffset){
01186                                         bitcounter += bundle._fragmentoffset - bitcounter + payloadsize;
01187                                 }
01188                                 else{
01189                                         allFragmentsReceived = false;
01190                                 }
01191                                 err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]);
01192                         }
01193 
01194                         if(bitcounter +1 >= bundle._fragmentoffset && allFragmentsReceived) //Wenn das aktuelle fragment das größte ist, ist die Liste durchlaufen und das Frag nicht dazugezählt. Dies wird hier nachgeholt.
01195                         {
01196                                 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize;
01197                         }
01198 
01199                         sqlite3_reset(_statements[BUNDLE_GET_FRAGMENT]);
01200                         if(err != SQLITE_DONE){
01201                                 stringstream error;
01202                                 error << "SQLiteBundleStorage: storeFragment: " << err << " errmsg: " << sqlite3_errmsg(_database);
01203                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01204                                 throw SQLiteQueryException(error.str());
01205                         }
01206 
01207                         //Save the Payload in a separate file
01208                         std::fstream datei(payloadfile.getPath().c_str(), std::ios::in | std::ios::out | std::ios::binary);
01209                         datei.seekp(bundle._fragmentoffset);
01210                         {
01211                                 ibrcommon::BLOB::iostream io = payloadBlob.iostream();
01212 
01213                                 size_t ret = 0;
01214                                 istream &s = (*io);
01215                                 s.seekg(0);
01216 
01217                                 while (true)
01218                                 {
01219                                         char buf;
01220                                         s.get(buf);
01221                                         if(!s.eof()){
01222                                                 datei.put(buf);
01223                                                 ret++;
01224                                         }
01225                                         else
01226                                                 break;
01227                                 }
01228                                 datei.close();
01229                         }
01230 
01231                         //All fragments received
01232                         if(bundle._appdatalength == bitcounter)
01233                         {
01234                                 /*
01235                                  * 1. Payloadblock zusammensetzen
01236                                  * 2. PayloadBlock in der BlockTable sichern
01237                                  * 3. Primary BlockInfos in die BundleTable eintragen.
01238                                  * 4. Einträge der Fragmenttabelle löschen
01239                                  */
01240 
01241                                 // 1. Get the payloadblockobject
01242                                 {
01243                                         ibrcommon::BLOB::Reference ref(ibrcommon::FileBLOB::create(payloadfile));
01244                                         dtn::data::PayloadBlock pb(ref);
01245 
01246                                         //Copy EID list
01247                                         if(pb.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS))
01248                                         {
01249                                                 std::list<dtn::data::EID> eidlist;
01250                                                 std::list<dtn::data::EID>::iterator eidIt;
01251                                                 eidlist = block.getEIDList();
01252 
01253                                                 for(eidIt = eidlist.begin(); eidIt != eidlist.end(); eidIt++)
01254                                                 {
01255                                                         pb.addEID(*eidIt);
01256                                                 }
01257                                         }
01258 
01259                                         //Copy ProcFlags
01260                                         pb.set(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT, block.get(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT));
01261                                         pb.set(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED));
01262                                         pb.set(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED));
01263                                         pb.set(dtn::data::PayloadBlock::LAST_BLOCK, block.get(dtn::data::PayloadBlock::LAST_BLOCK));
01264                                         pb.set(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED));
01265                                         pb.set(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED, block.get(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED));
01266                                         pb.set(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS, block.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS));
01267 
01268                                         //2. Save the PayloadBlock to a file and store the metainformation in the BlockTable
01269                                         ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01270 
01271                                         std::ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary);
01272                                         dtn::data::SeparateSerializer serializer(datei);
01273                                         serializer << (pb);
01274                                         datei.close();
01275 
01276                                         filename = tmpfile;
01277                                 }
01278 
01279                                 sqlite3_bind_text(_statements[BLOCK_STORE], 1,bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT);
01280 //                              sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType()));
01281                                 sqlite3_bind_text(_statements[BLOCK_STORE], 3, filename.getPath().c_str(), filename.getPath().length(), SQLITE_TRANSIENT);
01282                                 sqlite3_bind_int(_statements[BLOCK_STORE], 4, 0);
01283                                 executeQuery(_statements[BLOCK_STORE]);
01284 
01285                                 //3. Delete reassembled Payload data
01286                                 payloadfile.remove();
01287 
01288                                 //4. Remove Fragment entries from BundleTable
01289                                 sqlite3_bind_text(_statements[BUNDLE_DELETE],1,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01290                                 sqlite3_bind_int64(_statements[BUNDLE_DELETE],2,bundle._sequencenumber);
01291                                 sqlite3_bind_int64(_statements[BUNDLE_DELETE],3,bundle._timestamp);
01292                                 executeQuery(_statements[BUNDLE_DELETE]);
01293 
01294                                 //5. Write the Primary Block to the BundleTable
01295                                 size_t procFlags = bundle._procflags;
01296                                 procFlags &= ~(dtn::data::PrimaryBlock::FRAGMENT);
01297 
01298                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01299                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01300                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
01301                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT);
01302                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT);
01303                                 sqlite3_bind_int(_statements[BUNDLE_STORE],6,procFlags);
01304                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp);
01305                                 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber);
01306                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime);
01307                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset);
01308                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength);
01309                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL);
01310                                 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL);
01311                                 sqlite3_bind_int(_statements[BUNDLE_STORE],14,NULL);
01312                                 sqlite3_bind_text(_statements[BUNDLE_STORE],15,NULL,0,SQLITE_TRANSIENT);
01313                                 executeQuery(_statements[BUNDLE_STORE]);
01314                         }
01315 
01316                         //There are some fragments missing
01317                         else{
01318                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01319                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01320                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
01321                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT);
01322                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT);
01323                                 sqlite3_bind_int(_statements[BUNDLE_STORE],6,bundle._procflags);
01324                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp);
01325                                 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber);
01326                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime);
01327                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset);
01328                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength);
01329                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL);
01330                                 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL);
01331                                 sqlite3_bind_int(_statements[BUNDLE_STORE],14,payloadsize);
01332                                 sqlite3_bind_text(_statements[BUNDLE_STORE],15,payloadfile.getPath().c_str(),payloadfile.getPath().length(),SQLITE_TRANSIENT);
01333                                 executeQuery(_statements[BUNDLE_STORE]);
01334                         }
01335                 }
01336 #endif
01337 
01338                 void SQLiteBundleStorage::raiseEvent(const Event *evt)
01339                 {
01340                         try {
01341                                 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt);
01342 
01343                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
01344                                 {
01345                                         _tasks.push(new TaskExpire(time.getTimestamp()));
01346                                 }
01347                         } catch (const std::bad_cast&) { }
01348                         
01349                         try {
01350                                 const GlobalEvent &global = dynamic_cast<const GlobalEvent&>(*evt);
01351 
01352                                 if(global.getAction() == GlobalEvent::GLOBAL_IDLE)
01353                                 {
01354                                         // switch to idle mode
01355                                         ibrcommon::MutexLock l(TaskIdle::_mutex);
01356                                         TaskIdle::_idle = true;
01357 
01358                                         // generate an idle task
01359                                         _tasks.push(new TaskIdle());
01360                                 }
01361                                 else if(global.getAction() == GlobalEvent::GLOBAL_BUSY)
01362                                 {
01363                                         // switch back to non-idle mode
01364                                         ibrcommon::MutexLock l(TaskIdle::_mutex);
01365                                         TaskIdle::_idle = false;
01366                                 }
01367                         } catch (const std::bad_cast&) { }
01368                 }
01369 
01370                 void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage)
01371                 {
01372                         /*
01373                          * Only if the actual time is bigger or equal than the time when the next bundle expires, deleteexpired is called.
01374                          */
01375                         size_t exp_time = storage.get_expire_time();
01376                         if ((_timestamp < exp_time) || (exp_time == 0)) return;
01377 
01378                         /*
01379                          * Performanceverbesserung: Damit die Abfragen nicht jede Sekunde ausgeführt werden müssen, speichert man den Zeitpunkt an dem
01380                          * das nächste Bündel gelöscht werden soll in eine Variable und fürhrt deleteexpired erst dann aus wenn ein Bündel abgelaufen ist.
01381                          * Nach dem Löschen wird die DB durchsucht und der nächste Ablaufzeitpunkt wird in die Variable gesetzt.
01382                          */
01383 
01384                         {
01385                                 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_FILENAMES], storage._statements[EXPIRE_BUNDLE_FILENAMES]);
01386 
01387                                 // query for blocks of expired bundles
01388                                 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_FILENAMES], 1, _timestamp);
01389                                 while (sqlite3_step(storage._statements[EXPIRE_BUNDLE_FILENAMES]) == SQLITE_ROW)
01390                                 {
01391                                         ibrcommon::File block((const char*)sqlite3_column_text(storage._statements[EXPIRE_BUNDLE_FILENAMES],0));
01392                                         block.remove();
01393                                 }
01394                         }
01395 
01396                         {
01397                                 AutoResetLock l(storage._locks[EXPIRE_BUNDLES], storage._statements[EXPIRE_BUNDLES]);
01398 
01399                                 // query expired bundles
01400                                 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLES], 1, _timestamp);
01401                                 while (sqlite3_step(storage._statements[EXPIRE_BUNDLES]) == SQLITE_ROW)
01402                                 {
01403                                         dtn::data::BundleID id;
01404                                         storage.get_bundleid(storage._statements[EXPIRE_BUNDLES], id);
01405                                         dtn::core::BundleExpiredEvent::raise(id);
01406                                 }
01407                         }
01408 
01409                         {
01410                                 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_DELETE], storage._statements[EXPIRE_BUNDLE_DELETE]);
01411 
01412                                 // delete all expired db entries (bundles and blocks)
01413                                 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_DELETE], 1, _timestamp);
01414                                 sqlite3_step(storage._statements[EXPIRE_BUNDLE_DELETE]);
01415                         }
01416 
01417                         //update deprecated timer
01418                         storage.update_expire_time();
01419                 }
01420 
01421                 void SQLiteBundleStorage::update_expire_time()
01422                 {
01423                         ibrcommon::MutexLock l(_locks[EXPIRE_NEXT_TIMESTAMP]);
01424                         int err = sqlite3_step(_statements[EXPIRE_NEXT_TIMESTAMP]);
01425 
01426                         if (err == SQLITE_ROW)
01427                         {
01428                                 ibrcommon::MutexLock l(_next_expiration_lock);
01429                                 _next_expiration = sqlite3_column_int64(_statements[EXPIRE_NEXT_TIMESTAMP], 0);
01430                         }
01431                         else
01432                         {
01433                                 ibrcommon::MutexLock l(_next_expiration_lock);
01434                                 _next_expiration = 0;
01435                         }
01436 
01437                         sqlite3_reset(_statements[EXPIRE_NEXT_TIMESTAMP]);
01438                 }
01439 
01440                 void SQLiteBundleStorage::update_custodian(const dtn::data::BundleID &id, const dtn::data::EID &custodian)
01441                 {
01442                         // select query with or without fragmentation extension
01443                         STORAGE_STMT query = BUNDLE_UPDATE_CUSTODIAN;
01444                         if (id.fragment)        query = FRAGMENT_UPDATE_CUSTODIAN;
01445 
01446 
01447                         AutoResetLock l(_locks[query], _statements[query]);
01448 
01449                         sqlite3_bind_text(_statements[query], 1, custodian.getString().c_str(), custodian.getString().length(), SQLITE_TRANSIENT);
01450                         set_bundleid(_statements[query], id, 1);
01451 
01452                         // update the custodian in the database
01453                         int err = sqlite3_step(_statements[query]);
01454 
01455                         if (err != SQLITE_DONE)
01456                         {
01457                                 stringstream error;
01458                                 error << "SQLiteBundleStorage: update_custodian() failure: " << err << " " <<  sqlite3_errmsg(_database);
01459                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01460                         }
01461                 }
01462 
01463                 int SQLiteBundleStorage::store_blocks(const data::Bundle &bundle)
01464                 {
01465                         int blocktyp, blocknumber(1), storedBytes(0);
01466 
01467                         // get all blocks of the bundle
01468                         const list<const data::Block*> blocklist = bundle.getBlocks();
01469 
01470                         // generate a bundle id
01471                         data::BundleID id(bundle);
01472 
01473                         for(std::list<const data::Block*>::const_iterator it = blocklist.begin() ;it != blocklist.end(); it++)
01474                         {
01475                                 blocktyp = (int)(*it)->getType();
01476 
01477                                 ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01478                                 std::ofstream filestream(tmpfile.getPath().c_str(), std::ios_base::out | std::ios::binary);
01479 
01480                                 dtn::data::SeparateSerializer serializer(filestream);
01481                                 serializer << (*(*it));
01482                                 storedBytes += serializer.getLength(*(*it));
01483 
01484                                 filestream.close();
01485 
01486                                 // protect this query from concurrent access and enable the auto-reset feature
01487                                 AutoResetLock l(_locks[BLOCK_STORE], _statements[BLOCK_STORE]);
01488 
01489                                 // set bundle key data
01490                                 set_bundleid(_statements[BLOCK_STORE], id);
01491 
01492                                 // set the column four to null if this is not a fragment
01493                                 if (!id.fragment) sqlite3_bind_null(_statements[BLOCK_STORE], 4);
01494 
01495                                 // set the block type
01496                                 sqlite3_bind_int(_statements[BLOCK_STORE], 5, blocktyp);
01497 
01498                                 // the filename of the block data
01499                                 sqlite3_bind_text(_statements[BLOCK_STORE], 6, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT);
01500 
01501                                 // the ordering number
01502                                 sqlite3_bind_int(_statements[BLOCK_STORE], 7, blocknumber);
01503 
01504                                 // execute the query and store the block in the database
01505                                 if (sqlite3_step(_statements[BLOCK_STORE]) != SQLITE_DONE)
01506                                 {
01507                                         throw SQLiteQueryException("can not store block of bundle");
01508                                 }
01509 
01510                                 //increment blocknumber
01511                                 blocknumber++;
01512                         }
01513 
01514                         return storedBytes;
01515                 }
01516 
01517                 void SQLiteBundleStorage::get_blocks(dtn::data::Bundle &bundle, const dtn::data::BundleID &id)
01518                 {
01519                         int err = 0;
01520                         string file;
01521                         std::list<ibrcommon::File> files;
01522 
01523                         // select the right statement to use
01524                         const size_t stmt_key = id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID;
01525 
01526                         AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
01527 
01528                         // set the bundle key values
01529                         set_bundleid(_statements[stmt_key], id);
01530 
01531                         // query the database and step through all blocks
01532                         while ((err = sqlite3_step(_statements[stmt_key])) == SQLITE_ROW)
01533                         {
01534                                 files.push_back( ibrcommon::File( (const char*) sqlite3_column_text(_statements[stmt_key], 0) ) );
01535                         }
01536 
01537                         if (err == SQLITE_DONE)
01538                         {
01539                                 if (files.size() == 0)
01540                                 {
01541                                         IBRCOMMON_LOGGER(error) << "get_blocks: no blocks found for: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
01542                                         throw SQLiteQueryException("no blocks found");
01543                                 }
01544                         }
01545                         else
01546                         {
01547                                 IBRCOMMON_LOGGER(error) << "get_blocks() failure: "<< err << " " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
01548                                 throw SQLiteQueryException("can not query for blocks");
01549                         }
01550 
01551                         //De-serialize the Blocks
01552                         for (std::list<ibrcommon::File>::const_iterator it = files.begin(); it != files.end(); it++)
01553                         {
01554                                 const ibrcommon::File &f = (*it);
01555 
01556                                 // open the file
01557                                 std::ifstream is(f.getPath().c_str(), std::ios::binary | std::ios::in);
01558 
01559                                 // read the block
01560                                 dtn::data::SeparateDeserializer(is, bundle).readBlock();
01561 
01562                                 // close the file
01563                                 is.close();
01564                         }
01565                 }
01566 
01567                 void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage)
01568                 {
01569                         // until IDLE is false
01570                         while (true)
01571                         {
01572                                 /*
01573                                  * When an object (table, index, trigger, or view) is dropped from the database, it leaves behind empty space.
01574                                  * This empty space will be reused the next time new information is added to the database. But in the meantime,
01575                                  * the database file might be larger than strictly necessary. Also, frequent inserts, updates, and deletes can
01576                                  * cause the information in the database to become fragmented - scrattered out all across the database file rather
01577                                  * than clustered together in one place.
01578                                  * The VACUUM command cleans the main database. This eliminates free pages, aligns table data to be contiguous,
01579                                  * and otherwise cleans up the database file structure.
01580                                  */
01581                                 {
01582                                         AutoResetLock l(storage._locks[VACUUM], storage._statements[VACUUM]);
01583                                         sqlite3_step(storage._statements[VACUUM]);
01584                                 }
01585 
01586                                 // here we can do some IDLE stuff...
01587 				::sleep(1);
01588 
01589                                 ibrcommon::MutexLock l(TaskIdle::_mutex);
01590                                 if (!TaskIdle::_idle) return;
01591                         }
01592                 }
01593 
01594                 const std::string SQLiteBundleStorage::getName() const
01595                 {
01596                         return "SQLiteBundleStorage";
01597                 }
01598 
01599                 void SQLiteBundleStorage::releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id)
01600                 {
01601                         // custody is successful transferred to another node.
01602                         // it is safe to delete this bundle now. (depending on the routing algorithm.)
01603                         // update the custodian of this bundle with the new one
01604                         update_custodian(id, custodian);
01605                 }
01606 
01607                 void SQLiteBundleStorage::new_expire_time(size_t ttl)
01608                 {
01609                         ibrcommon::MutexLock l(_next_expiration_lock);
01610                         if (_next_expiration == 0 || ttl < _next_expiration)
01611                         {
01612                                 _next_expiration = ttl;
01613                         }
01614                 }
01615 
01616                 void SQLiteBundleStorage::reset_expire_time()
01617                 {
01618                         ibrcommon::MutexLock l(_next_expiration_lock);
01619                         _next_expiration = 0;
01620                 }
01621 
01622                 size_t SQLiteBundleStorage::get_expire_time()
01623                 {
01624                         ibrcommon::MutexLock l(_next_expiration_lock);
01625                         return _next_expiration;
01626                 }
01627 
01628                 void SQLiteBundleStorage::add_deletion(const dtn::data::BundleID &id)
01629                 {
01630                         ibrcommon::MutexLock l(_deletion_mutex);
01631                         _deletion_list.insert(id);
01632                 }
01633 
01634                 void SQLiteBundleStorage::remove_deletion(const dtn::data::BundleID &id)
01635                 {
01636                         ibrcommon::MutexLock l(_deletion_mutex);
01637                         _deletion_list.erase(id);
01638                 }
01639 
01640                 bool SQLiteBundleStorage::contains_deletion(const dtn::data::BundleID &id)
01641                 {
01642                         ibrcommon::MutexLock l(_deletion_mutex);
01643                         return (_deletion_list.find(id) != _deletion_list.end());
01644                 }
01645 
01646                 void SQLiteBundleStorage::trylock() throw (ibrcommon::MutexException)
01647                 {
01648                         // not implemented
01649                         throw ibrcommon::MutexException();
01650                 }
01651 
01652                 void SQLiteBundleStorage::enter() throw (ibrcommon::MutexException)
01653                 {
01654                         // lock all statements
01655                         for (int i = 0; i < SQL_QUERIES_END; i++)
01656                         {
01657                                 // lock the statement
01658                                 _locks[i].enter();
01659                         }
01660                 }
01661 
01662                 void SQLiteBundleStorage::leave() throw (ibrcommon::MutexException)
01663                 {
01664                         // unlock all statements
01665                         for (int i = 0; i < SQL_QUERIES_END; i++)
01666                         {
01667                                 // unlock the statement
01668                                 _locks[i].leave();
01669                         }
01670                 }
01671 
01672                 void SQLiteBundleStorage::set_bundleid(sqlite3_stmt *st, const dtn::data::BundleID &id, size_t offset) const
01673                 {
01674                         const std::string source_id = id.source.getString();
01675                         sqlite3_bind_text(st, offset + 1, source_id.c_str(), source_id.length(), SQLITE_TRANSIENT);
01676                         sqlite3_bind_int64(st, offset + 2, id.timestamp);
01677                         sqlite3_bind_int64(st, offset + 3, id.sequencenumber);
01678 
01679                         if (id.fragment)
01680                         {
01681                                 sqlite3_bind_int64(st, offset + 4, id.offset);
01682                         }
01683                 }
01684 
01685                 void SQLiteBundleStorage::get_bundleid(sqlite3_stmt *st, dtn::data::BundleID &id, size_t offset) const
01686                 {
01687                         id.source = dtn::data::EID((const char*)sqlite3_column_text(st, offset + 0));
01688                         id.timestamp = sqlite3_column_int64(st, offset + 1);
01689                         id.sequencenumber = sqlite3_column_int64(st, offset + 2);
01690 
01691                         id.fragment = (sqlite3_column_text(st, offset + 3) != NULL);
01692                         id.offset = sqlite3_column_int64(st, offset + 3);
01693                 }
01694         }
01695 }

Generated on Wed Mar 30 2011 11:11:49 for IBR-DTNSuite by  doxygen 1.7.1