IBR-DTNSuite 0.6

daemon/src/core/SQLiteBundleStorage.cpp

Go to the documentation of this file.
00001 /*
00002  * SQLiteBundleStorage.cpp
00003  *
00004  *  Created on: 09.01.2010
00005  *      Author: Myrtus
00006  */
00007 
00008 
00009 #include "core/SQLiteBundleStorage.h"
00010 #include "core/TimeEvent.h"
00011 #include "core/GlobalEvent.h"
00012 #include "core/BundleCore.h"
00013 #include "core/SQLiteConfigure.h"
00014 #include "core/BundleEvent.h"
00015 #include "core/BundleExpiredEvent.h"
00016 
00017 #include <ibrdtn/data/PayloadBlock.h>
00018 #include <ibrdtn/data/AgeBlock.h>
00019 #include <ibrdtn/data/Serializer.h>
00020 #include <ibrdtn/data/Bundle.h>
00021 #include <ibrdtn/data/BundleID.h>
00022 
00023 #include <ibrcommon/thread/MutexLock.h>
00024 #include <ibrcommon/data/BLOB.h>
00025 #include <ibrcommon/AutoDelete.h>
00026 #include <ibrcommon/Logger.h>
00027 
00028 namespace dtn
00029 {
00030         namespace core
00031         {
00032                 void sql_tracer(void*, const char * pQuery)
00033                 {
00034                         IBRCOMMON_LOGGER_DEBUG(50) << "sqlite trace: " << pQuery << IBRCOMMON_LOGGER_ENDL;
00035                 }
00036 
00037                 const std::string SQLiteBundleStorage::_tables[SQL_TABLE_END] =
00038                                 { "bundles", "blocks", "routing", "routing_bundles", "routing_nodes" };
00039 
00040                 const std::string SQLiteBundleStorage::_sql_queries[SQL_QUERIES_END] =
00041                 {
00042                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM " + _tables[SQL_TABLE_BUNDLE] + " ORDER BY priority DESC LIMIT ?,?;",
00043                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL LIMIT 1;",
00044                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? LIMIT 1;",
00045                         "SELECT * FROM " + _tables[SQL_TABLE_BUNDLE] + " WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset != NULL ORDER BY fragmentoffset ASC;",
00046 
00047                         "SELECT source, timestamp, sequencenumber, fragmentoffset, procflags FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
00048                         "SELECT filename FROM "+ _tables[SQL_TABLE_BUNDLE] +" as a, "+ _tables[SQL_TABLE_BLOCK] +" as b WHERE a.source_id = b.source_id AND a.timestamp = b.timestamp AND a.sequencenumber = b.sequencenumber AND a.fragmentoffset = b.fragmentoffset AND a.expiretime <= ?;",
00049                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
00050                         "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY expiretime ASC LIMIT 1;",
00051 
00052                         "SELECT ROWID FROM "+ _tables[SQL_TABLE_BUNDLE] +" LIMIT 1;",
00053                         "SELECT COUNT(ROWID) FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
00054 
00055                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;",
00056                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00057                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
00058                         "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +" (source_id, timestamp, sequencenumber, fragmentoffset, source, destination, reportto, custodian, procflags, lifetime, appdatalength, expiretime, priority) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);",
00059                         "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;",
00060                         "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00061 
00062                         "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET procflags = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00063 
00064                         "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL ORDER BY ordernumber ASC;",
00065                         "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? ORDER BY ordernumber ASC;",
00066                         "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL AND ordernumber = ?;",
00067                         "SELECT filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? AND ordernumber = ?;",
00068                         "DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +";",
00069                         "INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +" (source_id, timestamp, sequencenumber, fragmentoffset, blocktype, filename, ordernumber) VALUES (?,?,?,?,?,?,?);",
00070 
00071 #ifdef SQLITE_STORAGE_EXTENDED
00072                         "SELECT Routing FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;",
00073                         "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;",
00074                         "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +";",
00075                         "INSERT INTO "+ _tables[SQL_TABLE_ROUTING] +" (Key, Routing) VALUES (?,?);",
00076 
00077                         "SELECT Routing FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;",
00078                         "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;",
00079                         "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +";",
00080                         "INSERT INTO "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (EID, Key, Routing) VALUES (?,?,?);",
00081 
00082                         "SELECT Routing FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;",
00083                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;",
00084                         "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID, Key, Routing) VALUES (?,?,?);",
00085 #endif
00086 
00087                         "VACUUM;"
00088                 };
00089 
00090 //              const std::string SQLiteBundleStorage::_db_structure[11] =
00091 //              {
00092 //                      "create table if not exists "+ _tables[SQL_TABLE_BUNDLE] +" (Key INTEGER PRIMARY KEY ASC, BundleID text, Source text, Destination text, Reportto text, Custodian text, ProcFlags int, Timestamp int, Sequencenumber int, Lifetime int, Fragmentoffset int, Appdatalength int, TTL int, Size int, Payloadlength int, Payloadfilename text);",
00093 //                      "create table if not exists "+ _tables[SQL_TABLE_BLOCK] +" (Key INTEGER PRIMARY KEY ASC, BundleID text, BlockType int, Filename text, Blocknumber int);",
00094 //                      "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);",
00095 //                      "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);",
00096 //                      "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);",
00097 //                      "CREATE TRIGGER IF NOT EXISTS Blockdelete AFTER DELETE ON "+ _tables[SQL_TABLE_BUNDLE] +" FOR EACH ROW BEGIN DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE BundleID = OLD.BundleID; DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = OLD.BundleID; END;",
00098 //                      "CREATE TRIGGER IF NOT EXISTS BundleRoutingdelete BEFORE INSERT ON "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" BEGIN SELECT CASE WHEN (SELECT BundleID FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE BundleID = NEW.BundleID) IS NULL THEN RAISE(ABORT, \"Foreign Key Violation: BundleID doesn't exist in BundleTable\")END; END;",
00099 //                      "CREATE INDEX IF NOT EXISTS BlockIDIndex ON "+ _tables[SQL_TABLE_BLOCK] +" (BundleID);",
00100 //                      "CREATE UNIQUE INDEX IF NOT EXISTS BundleIDIndex ON "+ _tables[SQL_TABLE_BUNDLE] +" (BundleID);",
00101 //                      "CREATE UNIQUE INDEX IF NOT EXISTS RoutingKeyIndex ON "+ _tables[SQL_TABLE_ROUTING] +" (Key);",
00102 //                      "CREATE UNIQUE INDEX IF NOT EXISTS BundleRoutingIDIndex ON "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID,Key);",
00103 //              };
00104 
00105                 const std::string SQLiteBundleStorage::_db_structure[10] =
00106                 {
00107                         "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BLOCK] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `blocktype` INTEGER NOT NULL, `filename` TEXT NOT NULL, `ordernumber` INTEGER NOT NULL);",
00108                         "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BUNDLE] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `source` TEXT NOT NULL, `destination` TEXT NOT NULL, `reportto` TEXT NOT NULL, `custodian` TEXT NOT NULL, `procflags` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `lifetime` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `appdatalength` INTEGER DEFAULT NULL, `expiretime` INTEGER NOT NULL, `priority` INTEGER NOT NULL);",
00109                         "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);",
00110                         "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);",
00111                         "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);",
00112                         "CREATE TRIGGER IF NOT EXISTS blocks_autodelete AFTER DELETE ON " + _tables[SQL_TABLE_BUNDLE] + " FOR EACH ROW BEGIN DELETE FROM " + _tables[SQL_TABLE_BLOCK] + " WHERE " + _tables[SQL_TABLE_BLOCK] + ".source_id = OLD.source_id AND " + _tables[SQL_TABLE_BLOCK] + ".timestamp = OLD.timestamp AND " + _tables[SQL_TABLE_BLOCK] + ".sequencenumber = OLD.sequencenumber AND ((" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset IS NULL AND old.fragmentoffset IS NULL) OR (" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset = old.fragmentoffset)); END;",
00113                         "CREATE UNIQUE INDEX IF NOT EXISTS blocks_bid ON " + _tables[SQL_TABLE_BLOCK] + " (source_id, timestamp, sequencenumber, fragmentoffset);",
00114                         "CREATE INDEX IF NOT EXISTS bundles_destination ON " + _tables[SQL_TABLE_BUNDLE] + " (destination);",
00115                         "CREATE INDEX IF NOT EXISTS bundles_destination_priority ON " + _tables[SQL_TABLE_BUNDLE] + " (destination, priority);",
00116                         "CREATE UNIQUE INDEX IF NOT EXISTS bundles_id ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset);"
00117                         "CREATE INDEX IF NOT EXISTS bundles_expire ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset, expiretime);"
00118                 };
00119 
00120                 ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex;
00121                 bool SQLiteBundleStorage::TaskIdle::_idle = false;
00122 
00123                 SQLiteBundleStorage::SQLBundleQuery::SQLBundleQuery()
00124                  : _statement(NULL)
00125                 { }
00126 
00127                 SQLiteBundleStorage::SQLBundleQuery::~SQLBundleQuery()
00128                 {
00129                         // free the statement if used before
00130                         if (_statement != NULL)
00131                         {
00132                                 sqlite3_finalize(_statement);
00133                         }
00134                 }
00135 
00136                 SQLiteBundleStorage::AutoResetLock::AutoResetLock(ibrcommon::Mutex &mutex, sqlite3_stmt *st)
00137                  : _lock(mutex), _st(st)
00138                 {
00139                         //sqlite3_clear_bindings(st);
00140                 }
00141 
00142                 SQLiteBundleStorage::AutoResetLock::~AutoResetLock()
00143                 {
00144                         sqlite3_reset(_st);
00145                 }
00146 
00147                 SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const size_t &size)
00148                  : dbPath(path), dbFile(path.get("sqlite.db")), dbSize(size), _next_expiration(0)
00149                 {
00150                         //Configure SQLite Library
00151                         SQLiteConfigure::configure();
00152 
00153                         // check if SQLite is thread-safe
00154                         if (sqlite3_threadsafe() == 0)
00155                         {
00156                                 IBRCOMMON_LOGGER(critical) << "sqlite library has not compiled with threading support." << IBRCOMMON_LOGGER_ENDL;
00157                                 throw ibrcommon::Exception("need threading support in sqlite!");
00158                         }
00159                 }
00160 
00161                 SQLiteBundleStorage::~SQLiteBundleStorage()
00162                 {
00163                         stop();
00164                         join();
00165 
00166                         ibrcommon::MutexLock l(*this);
00167 
00168                         // free all statements
00169                         for (int i = 0; i < SQL_QUERIES_END; i++)
00170                         {
00171                                 // prepare the statement
00172                                 sqlite3_finalize(_statements[i]);
00173                         }
00174 
00175                         //close Databaseconnection
00176                         if (sqlite3_close(_database) != SQLITE_OK)
00177                         {
00178                                 IBRCOMMON_LOGGER(error) << "unable to close database" << IBRCOMMON_LOGGER_ENDL;
00179                         }
00180 
00181                         // shutdown sqlite library
00182                         SQLiteConfigure::shutdown();
00183                 }
00184 
00185                 void SQLiteBundleStorage::openDatabase(const ibrcommon::File &path)
00186                 {
00187                         ibrcommon::MutexLock l(*this);
00188 
00189                         // set the block path
00190                         _blockPath = dbPath.get(_tables[SQL_TABLE_BLOCK]);
00191 
00192                         //open the database
00193                         if (sqlite3_open_v2(path.getPath().c_str(), &_database,  SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL))
00194                         {
00195                                 IBRCOMMON_LOGGER(error) << "Can't open database: " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
00196                                 sqlite3_close(_database);
00197                                 throw ibrcommon::Exception("Unable to open sqlite database");
00198                         }
00199 
00200                         int err = 0;
00201 
00202                         // create all tables
00203                         for (size_t i = 0; i < 10; i++)
00204                         {
00205                                 sqlite3_stmt *st = prepare(_db_structure[i]);
00206                                 err = sqlite3_step(st);
00207                                 if(err != SQLITE_DONE)
00208                                 {
00209                                         IBRCOMMON_LOGGER(error) << "SQLiteBundleStorage: failed to create database" << err << IBRCOMMON_LOGGER_ENDL;
00210                                 }
00211                                 sqlite3_reset(st);
00212                                 sqlite3_finalize(st);
00213                         }
00214 
00215                         // create the bundle folder
00216                         ibrcommon::File::createDirectory( _blockPath );
00217 
00218                         // prepare all statements
00219                         for (int i = 0; i < SQL_QUERIES_END; i++)
00220                         {
00221                                 // prepare the statement
00222                                 int err = sqlite3_prepare_v2(_database, _sql_queries[i].c_str(), _sql_queries[i].length(), &_statements[i], 0);
00223                                 if ( err != SQLITE_OK )
00224                                 {
00225                                         IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failed to prepare statement: " << err << " with query: " << _sql_queries[i] << IBRCOMMON_LOGGER_ENDL;
00226                                 }
00227                         }
00228 
00229                         // disable synchronous mode
00230                         sqlite3_exec(_database, "PRAGMA synchronous = OFF;", NULL, NULL, NULL);
00231 
00232                         // enable sqlite tracing if debug level is higher than 50
00233                         if (IBRCOMMON_LOGGER_LEVEL >= 50)
00234                         {
00235                                 sqlite3_trace(_database, &sql_tracer, NULL);
00236                         }
00237                 }
00238 
00239                 void SQLiteBundleStorage::componentRun()
00240                 {
00241                         // loop until aborted
00242                         try {
00243                                 while (true)
00244                                 {
00245                                         Task *t = _tasks.getnpop(true);
00246 
00247                                         try {
00248                                                 BlockingTask &btask = dynamic_cast<BlockingTask&>(*t);
00249                                                 try {
00250                                                         btask.run(*this);
00251                                                 } catch (const std::exception&) {
00252                                                         btask.abort();
00253                                                         continue;
00254                                                 };
00255                                                 btask.done();
00256                                                 continue;
00257                                         } catch (const std::bad_cast&) { };
00258 
00259                                         try {
00260                                                 ibrcommon::AutoDelete<Task> killer(t);
00261                                                 t->run(*this);
00262                                         } catch (const std::exception&) { };
00263                                 }
00264                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00265                                 // we are aborted, abort all blocking tasks
00266                         }
00267                 }
00268 
00269                 void SQLiteBundleStorage::componentUp()
00270                 {
00271                         //register Events
00272                         bindEvent(TimeEvent::className);
00273                         bindEvent(GlobalEvent::className);
00274 
00275                         // open the database and create all folders and files if needed
00276                         openDatabase(dbFile);
00277 
00278                         //Check if the database is consistent with the filesystem
00279                         check_consistency();
00280 
00281                         // calculate next Bundleexpiredtime
00282                         update_expire_time();
00283                 };
00284 
00285                 void SQLiteBundleStorage::componentDown()
00286                 {
00287                         //unregister Events
00288                         unbindEvent(TimeEvent::className);
00289                         unbindEvent(GlobalEvent::className);
00290                 };
00291 
00292                 bool SQLiteBundleStorage::__cancellation()
00293                 {
00294                         _tasks.abort();
00295                         return true;
00296                 }
00297 
00298                 void SQLiteBundleStorage::check_consistency()
00299                 {
00300                         /*
00301                          * check if the database is consistent with the files on the HDD. Therefore every file of the database is put in a list.
00302                          * When the files of the database are scanned it is checked if the actual file is in the list of files of the filesystem. if it is so the entry
00303                          * of the list of files of the filesystem is deleted. if not the file is put in a seperate list. After this procedure there are 2 lists containing file
00304                          * which are inconsistent and can be deleted.
00305                          */
00306                         int err;
00307                         size_t timestamp, sequencenumber, offset, pFlags, appdata, lifetime;
00308                         set<string> blockFiles,payloadfiles;
00309                         string datei;
00310 
00311                         list<ibrcommon::File> blist;
00312                         list<ibrcommon::File>::iterator file_it;
00313 
00314                         //1. Durchsuche die Dateien
00315                         _blockPath.getFiles(blist);
00316                         for(file_it = blist.begin(); file_it != blist.end(); file_it++)
00317                         {
00318                                 datei = (*file_it).getPath();
00319                                 if(datei[datei.size()-1] != '.')
00320                                 {
00321                                         if(((*file_it).getBasename())[0] == 'b')
00322                                         {
00323                                                 blockFiles.insert(datei);
00324                                         }
00325                                         else
00326                                                 payloadfiles.insert(datei);
00327                                 }
00328                         }
00329 
00330                         //3. Check consistency of the bundles
00331                         check_bundles(blockFiles);
00332 
00333                         //4. Check consistency of the fragments
00334                         check_fragments(payloadfiles);
00335                 }
00336 
00337                 void SQLiteBundleStorage::check_fragments(std::set<std::string> &payloadFiles)
00338                 {
00339                         int err;
00340                         size_t procFlags, timestamp, sequencenumber, appdatalength, lifetime;
00341                         string filename, source, dest, custody, repto;
00342                         set<string> consistenFiles, inconsistenSources;
00343                         set<size_t> inconsistentTimestamp, inconsistentSeq_number;
00344                         set<string>::iterator file_it, consisten_it;
00345 
00346                         sqlite3_stmt *getPayloadfiles = prepare("SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+_tables[SQL_TABLE_BUNDLE]+" WHERE fragmentoffset != NULL;");
00347 
00348                         for(err = sqlite3_step(getPayloadfiles); err == SQLITE_ROW; err = sqlite3_step(getPayloadfiles))
00349                         {
00350                                 filename = (const char*)sqlite3_column_text(getPayloadfiles,10);
00351                                 file_it = payloadFiles.find(filename);
00352 
00353 
00354                                 if (file_it == payloadFiles.end())
00355                                 {
00356                                         consisten_it = consistenFiles.find(*file_it);
00357 
00358                                         //inconsistent DB entry
00359                                         if(consisten_it == consistenFiles.end()){
00360                                                 //Generate Report
00361                                                 source  = (const char*) sqlite3_column_text(getPayloadfiles,0);
00362                                                 dest    = (const char*) sqlite3_column_text(getPayloadfiles,1);
00363                                                 repto   = (const char*) sqlite3_column_text(getPayloadfiles,2);
00364                                                 custody = (const char*) sqlite3_column_text(getPayloadfiles,3);
00365                                                 procFlags = sqlite3_column_int64(getPayloadfiles,4);
00366                                                 timestamp = sqlite3_column_int64(getPayloadfiles,5);
00367                                                 sequencenumber = sqlite3_column_int64(getPayloadfiles,6);
00368                                                 lifetime  = sqlite3_column_int64(getPayloadfiles,7);
00369                                                 appdatalength  = sqlite3_column_int64(getPayloadfiles,9);
00370 
00371                                                 dtn::data::BundleID id(dtn::data::EID(source),timestamp,sequencenumber);
00372                                                 dtn::data::MetaBundle mb(id, lifetime, dtn::data::DTNTime(), dtn::data::EID(dest), dtn::data::EID(repto), dtn::data::EID(custody), appdatalength, procFlags);
00373                                                 dtn::core::BundleEvent::raise(mb, BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE);
00374                                         }
00375                                 }
00376                                 //consistent DB entry
00377                                 else
00378                                         consistenFiles.insert(*file_it);
00379                                         payloadFiles.erase(file_it);
00380                         }
00381 
00382                         sqlite3_reset(getPayloadfiles);
00383                         sqlite3_finalize(getPayloadfiles);
00384                 }
00385 
00386                 void SQLiteBundleStorage::check_bundles(std::set<std::string> &blockFiles)
00387                 {
00388                         std::set<dtn::data::BundleID> corrupt_bundle_ids;
00389 
00390                         sqlite3_stmt *blockConistencyCheck = prepare("SELECT source_id, timestamp, sequencenumber, fragmentoffset, filename, ordernumber FROM "+ _tables[SQL_TABLE_BLOCK] +";");
00391 
00392                         while (sqlite3_step(blockConistencyCheck) == SQLITE_ROW)
00393                         {
00394                                 dtn::data::BundleID id;
00395 
00396                                 // get the bundleid
00397                                 get_bundleid(blockConistencyCheck, id);
00398 
00399                                 // get the filename
00400                                 std::string filename = (const char*)sqlite3_column_text(blockConistencyCheck, 4);
00401 
00402                                 // if the filename is not in the list of block files
00403                                 if (blockFiles.find(filename) == blockFiles.end())
00404                                 {
00405                                         // add the bundle to the deletion list
00406                                         corrupt_bundle_ids.insert(id);
00407                                 }
00408                                 else
00409                                 {
00410                                         // file is available, everything is fine
00411                                         blockFiles.erase(filename);
00412                                 }
00413                         }
00414                         sqlite3_reset(blockConistencyCheck);
00415                         sqlite3_finalize(blockConistencyCheck);
00416 
00417                         for(std::set<dtn::data::BundleID>::const_iterator iter = corrupt_bundle_ids.begin(); iter != corrupt_bundle_ids.end(); iter++)
00418                         {
00419                                 try {
00420                                         const dtn::data::BundleID &id = (*iter);
00421 
00422                                         // get meta data of this bundle
00423                                         const dtn::data::MetaBundle m = get_meta(id);
00424 
00425                                         // remove the hole bundle
00426                                         remove(id);
00427                                 } catch (const dtn::core::SQLiteBundleStorage::SQLiteQueryException&) { };
00428                         }
00429 
00430                         // delete block files still in the blockfile list
00431                         for (std::set<std::string>::const_iterator iter = blockFiles.begin(); iter != blockFiles.end(); iter++)
00432                         {
00433                                 ibrcommon::File blockfile(*iter);
00434                                 blockfile.remove();
00435                         }
00436                 }
00437 
00438 
00439                 sqlite3_stmt* SQLiteBundleStorage::prepare(const std::string &sqlquery)
00440                 {
00441                         // prepare the statement
00442                         sqlite3_stmt *statement;
00443                         int err = sqlite3_prepare_v2(_database, sqlquery.c_str(), sqlquery.length(), &statement, 0);
00444                         if ( err != SQLITE_OK )
00445                         {
00446                                 IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failure in prepareStatement: " << err << " with Query: " << sqlquery << IBRCOMMON_LOGGER_ENDL;
00447                         }
00448                         return statement;
00449                 }
00450 
00451                 dtn::data::MetaBundle SQLiteBundleStorage::get_meta(const dtn::data::BundleID &id)
00452                 {
00453                         dtn::data::MetaBundle bundle;
00454 
00455                         // check if the bundle is already on the deletion list
00456                         if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException();
00457 
00458                         size_t stmt_key = BUNDLE_GET_ID;
00459                         if (id.fragment) stmt_key = FRAGMENT_GET_ID;
00460 
00461                         // lock the prepared statement
00462                         AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
00463 
00464                         // bind bundle id to the statement
00465                         set_bundleid(_statements[stmt_key], id);
00466 
00467                         // execute the query and check for error
00468                         if (sqlite3_step(_statements[stmt_key]) != SQLITE_ROW)
00469                         {
00470                                 stringstream error;
00471                                 error << "SQLiteBundleStorage: No Bundle found with BundleID: " << id.toString();
00472                                 IBRCOMMON_LOGGER_DEBUG(15) << error.str() << IBRCOMMON_LOGGER_ENDL;
00473                                 throw SQLiteQueryException(error.str());
00474                         }
00475 
00476                         // query bundle data
00477                         get(_statements[stmt_key], bundle);
00478 
00479                         return bundle;
00480                 }
00481 
00482                 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::MetaBundle &bundle, size_t offset)
00483                 {
00484                         bundle.source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) );
00485                         bundle.destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) );
00486                         bundle.reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) );
00487                         bundle.custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) );
00488                         bundle.procflags = sqlite3_column_int(st, offset + 4);
00489                         bundle.timestamp = sqlite3_column_int64(st, offset + 5);
00490                         bundle.sequencenumber = sqlite3_column_int64(st, offset + 6);
00491                         bundle.lifetime = sqlite3_column_int64(st, offset + 7);
00492 
00493                         if (bundle.procflags & data::Bundle::FRAGMENT)
00494                         {
00495                                 bundle.offset = sqlite3_column_int64(st, offset + 8);
00496                                 bundle.appdatalength = sqlite3_column_int64(st, offset + 9);
00497                         }
00498                 }
00499 
00500                 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::Bundle &bundle, size_t offset)
00501                 {
00502                         bundle._source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) );
00503                         bundle._destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) );
00504                         bundle._reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) );
00505                         bundle._custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) );
00506                         bundle._procflags = sqlite3_column_int(st, offset + 4);
00507                         bundle._timestamp = sqlite3_column_int64(st, offset + 5);
00508                         bundle._sequencenumber = sqlite3_column_int64(st, offset + 6);
00509                         bundle._lifetime = sqlite3_column_int64(st, offset + 7);
00510 
00511                         if (bundle._procflags & data::Bundle::FRAGMENT)
00512                         {
00513                                 bundle._fragmentoffset = sqlite3_column_int64(st, offset + 8);
00514                                 bundle._appdatalength = sqlite3_column_int64(st, offset + 9);
00515                         }
00516                 }
00517 
00518                 const std::list<dtn::data::MetaBundle> SQLiteBundleStorage::get(BundleFilterCallback &cb)
00519                 {
00520                         std::list<dtn::data::MetaBundle> ret;
00521 
00522                         const std::string base_query =
00523                                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM " + _tables[SQL_TABLE_BUNDLE];
00524 
00525                         sqlite3_stmt *st = NULL;
00526                         size_t bind_offset = 1;
00527 
00528                         try {
00529                                 SQLBundleQuery &query = dynamic_cast<SQLBundleQuery&>(cb);
00530 
00531                                 if (query._statement == NULL)
00532                                 {
00533                                         std::string sql = base_query + " WHERE " + query.getWhere() + " ORDER BY priority DESC";
00534 
00535                                         if (cb.limit() > 0)
00536                                         {
00537                                                 sql += " LIMIT ?,?";
00538                                         }
00539 
00540                                         // add closing delimiter
00541                                         sql += ";";
00542 
00543                                         // prepare the hole statement
00544                                         query._statement = prepare(sql);
00545                                 }
00546 
00547                                 st = query._statement;
00548                                 bind_offset = query.bind(st, 1);
00549                         } catch (const std::bad_cast&) {
00550                                 // this query is not optimized for sql, use the generic way (slow)
00551                                 st = _statements[BUNDLE_GET_FILTER];
00552                         };
00553 
00554                         size_t offset = 0;
00555                         while (true)
00556                         {
00557                                 // lock the database
00558                                 AutoResetLock l(_locks[BUNDLE_GET_FILTER], st);
00559 
00560                                 if (cb.limit() > 0)
00561                                 {
00562                                         sqlite3_bind_int64(st, bind_offset, offset);
00563                                         sqlite3_bind_int64(st, bind_offset + 1, cb.limit());
00564                                 }
00565 
00566                                 if (sqlite3_step(st) == SQLITE_DONE)
00567                                 {
00568                                         // returned no result
00569                                         break;
00570                                 }
00571 
00572                                 while (true)
00573                                 {
00574                                         dtn::data::MetaBundle m;
00575 
00576                                         // extract the primary values and set them in the bundle object
00577                                         get(st, m, 0);
00578 
00579                                         // check if the bundle is already on the deletion list
00580                                         if (!contains_deletion(m))
00581                                         {
00582                                                 // ask the filter if this bundle should be added to the return list
00583                                                 if (cb.shouldAdd(m))
00584                                                 {
00585                                                         IBRCOMMON_LOGGER_DEBUG(40) << "add bundle to query selection list: " << m.toString() << IBRCOMMON_LOGGER_ENDL;
00586 
00587                                                         // add the bundle to the list
00588                                                         ret.push_back(m);
00589                                                 }
00590                                         }
00591 
00592                                         if (sqlite3_step(st) != SQLITE_ROW)
00593                                         {
00594                                                 break;
00595                                         }
00596 
00597                                         // abort if enough bundles are found
00598                                         if ((cb.limit() > 0) && (ret.size() >= cb.limit())) break;
00599                                 }
00600 
00601                                 // if a limit is set
00602                                 if (cb.limit() > 0)
00603                                 {
00604                                         // increment the offset, because we might not have enough
00605                                         offset += cb.limit();
00606 
00607                                         // abort if enough bundles are found
00608                                         if (ret.size() >= cb.limit()) break;
00609                                 }
00610                                 else
00611                                 {
00612                                         break;
00613                                 }
00614                         }
00615 
00616                         return ret;
00617                 }
00618 
00619                 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id)
00620                 {
00621                         dtn::data::Bundle bundle;
00622                         int err = 0;
00623 
00624                         IBRCOMMON_LOGGER_DEBUG(25) << "get bundle from sqlite storage " << id.toString() << IBRCOMMON_LOGGER_ENDL;
00625 
00626                         // if bundle is deleted?
00627                         if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException();
00628 
00629                         size_t stmt_key = BUNDLE_GET_ID;
00630                         if (id.fragment) stmt_key = FRAGMENT_GET_ID;
00631 
00632                         // do this while db is locked
00633                         AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
00634 
00635                         // set the bundle key values
00636                         set_bundleid(_statements[stmt_key], id);
00637 
00638                         // execute the query and check for error
00639                         if ((err = sqlite3_step(_statements[stmt_key])) != SQLITE_ROW)
00640                         {
00641                                 IBRCOMMON_LOGGER_DEBUG(15) << "sql error: " << err << "; No bundle found with id: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
00642                                 throw dtn::core::BundleStorage::NoBundleFoundException();
00643                         }
00644 
00645                         // read bundle data
00646                         get(_statements[stmt_key], bundle);
00647 
00648                         try {
00649                                 // read all blocks
00650                                 get_blocks(bundle, id);
00651                         } catch (const ibrcommon::Exception &ex) {
00652                                 IBRCOMMON_LOGGER(error) << "could not get bundle blocks: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00653                                 throw dtn::core::BundleStorage::NoBundleFoundException();
00654                         }
00655 
00656                         return bundle;
00657                 }
00658 
00659 #ifdef SQLITE_STORAGE_EXTENDED
00660                 void SQLiteBundleStorage::setPriority(const int priority, const dtn::data::BundleID &id)
00661                 {
00662                         int err;
00663                         size_t procflags;
00664                         {
00665                                 sqlite3_bind_text(_statements[PROCFLAGS_GET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT);
00666                                 err = sqlite3_step(_statements[PROCFLAGS_GET]);
00667                                 if(err == SQLITE_ROW)
00668                                         procflags = sqlite3_column_int(_statements[PROCFLAGS_GET],0);
00669                                 sqlite3_reset(_statements[PROCFLAGS_GET]);
00670                                 if(err != SQLITE_DONE){
00671                                         stringstream error;
00672                                         error << "SQLiteBundleStorage: error while Select Querry: " << err;
00673                                         IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00674                                         throw SQLiteQueryException(error.str());
00675                                 }
00676 
00677                                 //Priority auf 0 Setzen
00678                                 procflags -= ( 128*(procflags & dtn::data::Bundle::PRIORITY_BIT1) + 256*(procflags & dtn::data::Bundle::PRIORITY_BIT2));
00679 
00680                                 //Set Prioritybits
00681                                 switch(priority){
00682                                 case 1: procflags += data::Bundle::PRIORITY_BIT1; break; //Set PRIORITY_BIT1
00683                                 case 2: procflags += data::Bundle::PRIORITY_BIT2; break;
00684                                 case 3: procflags += (data::Bundle::PRIORITY_BIT1 + data::Bundle::PRIORITY_BIT2); break;
00685                                 }
00686 
00687                                 sqlite3_bind_text(_statements[PROCFLAGS_SET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT);
00688                                 sqlite3_bind_int64(_statements[PROCFLAGS_SET],2,priority);
00689                                 sqlite3_step(_statements[PROCFLAGS_SET]);
00690                                 sqlite3_reset(_statements[PROCFLAGS_SET]);
00691 
00692                                 if(err != SQLITE_DONE)
00693                                 {
00694                                         stringstream error;
00695                                         error << "SQLiteBundleStorage: setPriority error while Select Querry: " << err;
00696                                         IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00697                                         throw SQLiteQueryException(error.str());
00698                                 }
00699                         }
00700                 }
00701 #endif
00702 
00703                 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle)
00704                 {
00705                         IBRCOMMON_LOGGER_DEBUG(25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00706 
00707                         // determine if the bundle is for local delivery
00708                         bool local = (bundle._destination.sameHost(BundleCore::local)
00709                                         && (bundle._procflags & dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON));
00710 
00711 //                      // if the bundle is a fragment store it as one
00712 //                      if (bundle.get(dtn::data::Bundle::FRAGMENT) && local)
00713 //                      {
00714 //                              storeFragment(bundle);
00715 //                              return;
00716 //                      }
00717 
00718                         int err;
00719 
00720                         const dtn::data::EID _sourceid = bundle._source;
00721                         size_t TTL = bundle._timestamp + bundle._lifetime;
00722 
00723                         AutoResetLock l(_locks[BUNDLE_STORE], _statements[BUNDLE_STORE]);
00724 
00725                         set_bundleid(_statements[BUNDLE_STORE], bundle);
00726 
00727                         sqlite3_bind_text(_statements[BUNDLE_STORE], 5, bundle._source.getString().c_str(), bundle._source.getString().length(), SQLITE_TRANSIENT);
00728                         sqlite3_bind_text(_statements[BUNDLE_STORE], 6, bundle._destination.getString().c_str(), bundle._destination.getString().length(), SQLITE_TRANSIENT);
00729                         sqlite3_bind_text(_statements[BUNDLE_STORE], 7, bundle._reportto.getString().c_str(), bundle._reportto.getString().length(), SQLITE_TRANSIENT);
00730                         sqlite3_bind_text(_statements[BUNDLE_STORE], 8, bundle._custodian.getString().c_str(), bundle._custodian.getString().length(), SQLITE_TRANSIENT);
00731                         sqlite3_bind_int(_statements[BUNDLE_STORE], 9, bundle._procflags);
00732                         sqlite3_bind_int64(_statements[BUNDLE_STORE], 10, bundle._lifetime);
00733 
00734                         if (bundle.get(dtn::data::Bundle::FRAGMENT))
00735                         {
00736                                 sqlite3_bind_int64(_statements[BUNDLE_STORE], 11, bundle._appdatalength);
00737                         }
00738                         else
00739                         {
00740                                 sqlite3_bind_null(_statements[BUNDLE_STORE], 4);
00741                                 sqlite3_bind_null(_statements[BUNDLE_STORE], 11);
00742                         }
00743 
00744                         sqlite3_bind_int64(_statements[BUNDLE_STORE], 12, TTL);
00745                         sqlite3_bind_int64(_statements[BUNDLE_STORE], 13, dtn::data::MetaBundle(bundle).getPriority());
00746 
00747                         // start a transaction
00748                         sqlite3_exec(_database, "BEGIN TRANSACTION;", NULL, NULL, NULL);
00749 
00750                         // store the bundle data in the database
00751                         err = sqlite3_step(_statements[BUNDLE_STORE]);
00752 
00753                         if (err == SQLITE_CONSTRAINT)
00754                         {
00755                                 IBRCOMMON_LOGGER(warning) << "Bundle is already in the storage" << IBRCOMMON_LOGGER_ENDL;
00756                         }
00757                         else if (err != SQLITE_DONE)
00758                         {
00759                                 stringstream error;
00760                                 error << "SQLiteBundleStorage: store() failure: " << err << " " <<  sqlite3_errmsg(_database);
00761                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00762 
00763                                 // rollback the whole transaction
00764                                 sqlite3_exec(_database, "ROLLBACK TRANSACTION;", NULL, NULL, NULL);
00765 
00766                                 throw SQLiteQueryException(error.str());
00767                         }
00768                         else
00769                         {
00770                                 // stores the blocks to a file
00771                                 store_blocks(bundle);
00772 
00773                                 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL;
00774                         }
00775 
00776                         // commit the transaction
00777                         sqlite3_exec(_database, "END TRANSACTION;", NULL, NULL, NULL);
00778 
00779                         // set new expire time
00780                         new_expire_time(TTL);
00781 
00782                         try {
00783                                 // the bundle is stored sucessfully, we could accept custody if it is requested
00784                                 const dtn::data::EID custodian = acceptCustody(bundle);
00785 
00786                                 // update the custody address of this bundle
00787                                 update_custodian(bundle, custodian);
00788                         } catch (const ibrcommon::Exception&) {
00789                                 // this bundle has no request for custody transfers
00790                         }
00791                 }
00792 
00793                 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id)
00794                 {
00795                         add_deletion(id);
00796                         _tasks.push(new TaskRemove(id));
00797                 }
00798 
00799                 void SQLiteBundleStorage::TaskRemove::run(SQLiteBundleStorage &storage)
00800                 {
00801                         {
00802                                 // select the right statement to use
00803                                 const size_t stmt_key = _id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID;
00804 
00805                                 // lock the database
00806                                 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]);
00807 
00808                                 // set the bundle key values
00809                                 storage.set_bundleid(storage._statements[stmt_key], _id);
00810 
00811                                 // step through all blocks
00812                                 while (sqlite3_step(storage._statements[stmt_key]) == SQLITE_ROW)
00813                                 {
00814                                         // delete each referenced block file
00815                                         ibrcommon::File blockfile( (const char*)sqlite3_column_text(storage._statements[stmt_key], 0) );
00816                                         blockfile.remove();
00817                                 }
00818                         }
00819 
00820                         {
00821                                 const size_t stmt_key = _id.fragment ? FRAGMENT_DELETE : BUNDLE_DELETE;
00822 
00823                                 // lock the database
00824                                 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]);
00825 
00826                                 // then remove the bundle data
00827                                 storage.set_bundleid(storage._statements[stmt_key], _id);
00828                                 sqlite3_step(storage._statements[stmt_key]);
00829 
00830                                 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << _id.toString() << " deleted" << IBRCOMMON_LOGGER_ENDL;
00831                         }
00832 
00833                         //update deprecated timer
00834                         storage.update_expire_time();
00835 
00836                         // remove it from the deletion list
00837                         storage.remove_deletion(_id);
00838                 }
00839 
00840 #ifdef SQLITE_STORAGE_EXTENDED
00841                 std::string SQLiteBundleStorage::getBundleRoutingInfo(const data::BundleID &bundleID, const int &key)
00842                 {
00843                         int err;
00844                         string result;
00845 
00846                         sqlite3_bind_text(_statements[ROUTING_GET],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00847                         sqlite3_bind_int(_statements[ROUTING_GET],2,key);
00848                         err = sqlite3_step(_statements[ROUTING_GET]);
00849                         if(err == SQLITE_ROW){
00850                                 result = (const char*) sqlite3_column_text(_statements[ROUTING_GET],0);
00851                         }
00852                         sqlite3_reset(_statements[ROUTING_GET]);
00853                         if(err != SQLITE_DONE && err != SQLITE_ROW){
00854                                 stringstream error;
00855                                 error << "SQLiteBundleStorage: getBundleRoutingInfo: " << err << " errmsg: " << err;
00856                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00857                                 throw SQLiteQueryException(error.str());
00858                         }
00859 
00860                         return result;
00861                 }
00862 
00863                 std::string SQLiteBundleStorage::getNodeRoutingInfo(const data::EID &eid, const int &key)
00864                 {
00865                         int err;
00866                         string result;
00867 
00868                         err = sqlite3_bind_text(_statements[NODE_GET],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT);
00869                         err = sqlite3_bind_int(_statements[NODE_GET],2,key);
00870                         err = sqlite3_step(_statements[NODE_GET]);
00871                         if(err == SQLITE_ROW){
00872                                 result = (const char*) sqlite3_column_text(_statements[NODE_GET],0);
00873                         }
00874                         sqlite3_reset(_statements[NODE_GET]);
00875                         if(err != SQLITE_DONE && err != SQLITE_ROW){
00876                                 stringstream error;
00877                                 error << "SQLiteBundleStorage: getNodeRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database);
00878                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00879                                 throw SQLiteQueryException(error.str());
00880                         }
00881 
00882                         return result;
00883                 }
00884 
00885                 std::string SQLiteBundleStorage::getRoutingInfo(const int &key)
00886                 {
00887                         int err;
00888                         string result;
00889 
00890                         sqlite3_bind_int(_statements[INFO_GET],1,key);
00891                         err = sqlite3_step(_statements[INFO_GET]);
00892                         if(err == SQLITE_ROW){
00893                                 result = (const char*) sqlite3_column_text(_statements[INFO_GET],0);
00894                         }
00895                         sqlite3_reset(_statements[INFO_GET]);
00896                         if(err != SQLITE_DONE && err != SQLITE_ROW){
00897                                 stringstream error;
00898                                 error << "SQLiteBundleStorage: getRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database);
00899                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00900                                 throw SQLiteQueryException(error.str());
00901                         }
00902 
00903                         return result;
00904                 }
00905 
00906                 void SQLiteBundleStorage::storeBundleRoutingInfo(const data::BundleID &bundleID, const int &key, const string &routingInfo)
00907                 {
00908                         int err;
00909 
00910                         sqlite3_bind_text(_statements[ROUTING_STORE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00911                         sqlite3_bind_int(_statements[ROUTING_STORE],2,key);
00912                         sqlite3_bind_text(_statements[ROUTING_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00913                         err = sqlite3_step(_statements[ROUTING_STORE]);
00914                         sqlite3_reset(_statements[ROUTING_STORE]);
00915                         if(err == SQLITE_CONSTRAINT)
00916                                 cerr << "warning: BundleRoutingInformations for "<<bundleID.toString() <<" are either already in the storage or there is no Bundle with this BundleID."<<endl;
00917                         else if(err != SQLITE_DONE){
00918                                 stringstream error;
00919                                 error << "SQLiteBundleStorage: storeBundleRoutingInfo: " << err << " errmsg: " << err;
00920                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00921                                 throw SQLiteQueryException(error.str());
00922                         }
00923                 }
00924 
00925                 void SQLiteBundleStorage::storeNodeRoutingInfo(const data::EID &node, const int &key, const std::string &routingInfo)
00926                 {
00927                         int err;
00928 
00929                         sqlite3_bind_text(_statements[NODE_STORE],1,node.getString().c_str(),node.getString().length(), SQLITE_TRANSIENT);
00930                         sqlite3_bind_int(_statements[NODE_STORE],2,key);
00931                         sqlite3_bind_text(_statements[NODE_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00932                         err = sqlite3_step(_statements[NODE_STORE]);
00933                         sqlite3_reset(_statements[NODE_STORE]);
00934                         if(err == SQLITE_CONSTRAINT)
00935                                 cerr << "warning: NodeRoutingInfo for "<<node.getString() <<" are already in the storage."<<endl;
00936                         else if(err != SQLITE_DONE){
00937                                 stringstream error;
00938                                 error << "SQLiteBundleStorage: storeNodeRoutingInfo: " << err << " errmsg: " << err;
00939                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00940                                 throw SQLiteQueryException(error.str());
00941                         }
00942                 }
00943 
00944                 void SQLiteBundleStorage::storeRoutingInfo(const int &key, const std::string &routingInfo)
00945                 {
00946                         int err;
00947 
00948                         sqlite3_bind_int(_statements[INFO_STORE],1,key);
00949                         sqlite3_bind_text(_statements[INFO_STORE],2,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00950                         err = sqlite3_step(_statements[INFO_STORE]);
00951                         sqlite3_reset(_statements[INFO_STORE]);
00952                         if(err == SQLITE_CONSTRAINT)
00953                                 cerr << "warning: There are already RoutingInformation refereed by this Key"<<endl;
00954                         else if(err != SQLITE_DONE){
00955                                 stringstream error;
00956                                 error << "SQLiteBundleStorage: storeRoutingInfo: " << err << " errmsg: " << err;
00957                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00958                                 throw SQLiteQueryException(error.str());
00959                         }
00960                 }
00961 
00962                 void SQLiteBundleStorage::removeBundleRoutingInfo(const data::BundleID &bundleID, const int &key)
00963                 {
00964                         int err;
00965 
00966                         sqlite3_bind_text(_statements[ROUTING_REMOVE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00967                         sqlite3_bind_int(_statements[ROUTING_REMOVE],2,key);
00968                         err = sqlite3_step(_statements[ROUTING_REMOVE]);
00969                         sqlite3_reset(_statements[ROUTING_REMOVE]);
00970                         if(err != SQLITE_DONE){
00971                                 stringstream error;
00972                                 error << "SQLiteBundleStorage: removeBundleRoutingInfo: " << err << " errmsg: " << err;
00973                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00974                                 throw SQLiteQueryException(error.str());
00975                         }
00976                 }
00977 
00978                 void SQLiteBundleStorage::removeNodeRoutingInfo(const data::EID &eid, const int &key)
00979                 {
00980                         int err;
00981 
00982                         sqlite3_bind_text(_statements[NODE_REMOVE],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT);
00983                         sqlite3_bind_int(_statements[NODE_REMOVE],2,key);
00984                         err = sqlite3_step(_statements[NODE_REMOVE]);
00985                         sqlite3_reset(_statements[NODE_REMOVE]);
00986                         if(err != SQLITE_DONE){
00987                                 stringstream error;
00988                                 error << "SQLiteBundleStorage: removeNodeRoutingInfo: " << err << " errmsg: " << err;
00989                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00990                                 throw SQLiteQueryException(error.str());
00991                         }
00992                 }
00993 
00994                 void SQLiteBundleStorage::removeRoutingInfo(const int &key)
00995                 {
00996                         int err;
00997 
00998                         sqlite3_bind_int(_statements[INFO_REMOVE],1,key);
00999                         err = sqlite3_step(_statements[INFO_REMOVE]);
01000                         sqlite3_reset(_statements[INFO_REMOVE]);
01001                         if(err != SQLITE_DONE){
01002                                 stringstream error;
01003                                 error << "SQLiteBundleStorage: removeRoutingInfo: " << err << " errmsg: " << err;
01004                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01005                                 throw SQLiteQueryException(error.str());
01006                         }
01007                 }
01008 #endif
01009 
01010                 void SQLiteBundleStorage::clearAll()
01011                 {
01012 #ifdef SQLITE_STORAGE_EXTENDED
01013                         sqlite3_step(_statements[ROUTING_CLEAR]);
01014                         sqlite3_reset(_statements[ROUTING_CLEAR]);
01015                         sqlite3_step(_statements[NODE_CLEAR]);
01016                         sqlite3_reset(_statements[NODE_CLEAR]);
01017 #endif
01018 
01019                         clear();
01020                 }
01021 
01022                 void SQLiteBundleStorage::clear()
01023                 {
01024                         AutoResetLock l(_locks[VACUUM], _statements[VACUUM]);
01025 
01026                         sqlite3_step(_statements[BUNDLE_CLEAR]);
01027                         sqlite3_reset(_statements[BUNDLE_CLEAR]);
01028                         sqlite3_step(_statements[BLOCK_CLEAR]);
01029                         sqlite3_reset(_statements[BLOCK_CLEAR]);
01030 
01031                         if(SQLITE_DONE != sqlite3_step(_statements[VACUUM]))
01032                         {
01033                                 sqlite3_reset(_statements[VACUUM]);
01034                                 throw SQLiteQueryException("SQLiteBundleStore: clear(): vacuum failed.");
01035                         }
01036 
01037                         //Delete Folder SQL_TABLE_BLOCK containing Blocks
01038                         {
01039                                 _blockPath.remove(true);
01040                                 ibrcommon::File::createDirectory(_blockPath);
01041                         }
01042 
01043                         reset_expire_time();
01044                 }
01045 
01046 
01047 
01048                 bool SQLiteBundleStorage::empty()
01049                 {
01050                         AutoResetLock l(_locks[EMPTY_CHECK], _statements[EMPTY_CHECK]);
01051 
01052                         if (SQLITE_DONE == sqlite3_step(_statements[EMPTY_CHECK]))
01053                         {
01054                                 return true;
01055                         }
01056                         else
01057                         {
01058                                 return false;
01059                         }
01060                 }
01061 
01062                 unsigned int SQLiteBundleStorage::count()
01063                 {
01064                         int rows = 0;
01065                         int err = 0;
01066 
01067                         AutoResetLock l(_locks[COUNT_ENTRIES], _statements[COUNT_ENTRIES]);
01068 
01069                         if ((err = sqlite3_step(_statements[COUNT_ENTRIES])) == SQLITE_ROW)
01070                         {
01071                                 rows = sqlite3_column_int(_statements[COUNT_ENTRIES], 0);
01072                         }
01073                         else
01074                         {
01075                                 stringstream error;
01076                                 error << "SQLiteBundleStorage: count: failure " << err << " " << sqlite3_errmsg(_database);
01077                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01078                                 throw SQLiteQueryException(error.str());
01079                         }
01080 
01081                         return rows;
01082                 }
01083 
01084 #ifdef SQLITE_STORAGE_EXTENDED
01085                 int SQLiteBundleStorage::occupiedSpace()
01086                 {
01087                         int size = 0;
01088                         std::list<ibrcommon::File> files;
01089 
01090                         if (_blockPath.getFiles(files))
01091                         {
01092                                 IBRCOMMON_LOGGER(error) << "occupiedSpace: unable to open Directory " << _blockPath.getPath() << IBRCOMMON_LOGGER_ENDL;
01093                                 return -1;
01094                         }
01095 
01096                         for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++)
01097                         {
01098                                 const ibrcommon::File &f = (*iter);
01099 
01100                                 if (!f.isSystem())
01101                                 {
01102                                         size += f.size();
01103                                 }
01104                         }
01105 
01106                         // add db file size
01107                         size += dbFile.size();
01108 
01109                         return size;
01110                 }
01111 
01112                 void SQLiteBundleStorage::storeFragment(const dtn::data::Bundle &bundle)
01113                 {
01114                         int err = 0;
01115                         size_t bitcounter = 0;
01116                         bool allFragmentsReceived(true);
01117                         size_t payloadsize, TTL, fragmentoffset;
01118 
01119                         ibrcommon::File payloadfile, filename;
01120 
01121                         // generate a bundle id string
01122                         std::string bundleID = dtn::data::BundleID(bundle).toString();
01123 
01124                         // get the destination id string
01125                         std::string destination = bundle._destination.getString();
01126 
01127                         // get the source id string
01128                         std::string sourceEID = bundle._source.getString();
01129 
01130                         // calculate the expiration timestamp
01131                         TTL = bundle._timestamp + bundle._lifetime;
01132 
01133                         const dtn::data::PayloadBlock &block = bundle.getBlock<dtn::data::PayloadBlock>();
01134                         ibrcommon::BLOB::Reference payloadBlob = block.getBLOB();
01135 
01136                         // get the payload size
01137                         {
01138                                 ibrcommon::BLOB::iostream io = payloadBlob.iostream();
01139                                 payloadsize = io.size();
01140                         }
01141 
01142                         //Saves the blocks from the first and the last Fragment, they may contain Extentionblock
01143                         bool last = bundle._fragmentoffset + payloadsize == bundle._appdatalength;
01144                         bool first = bundle._fragmentoffset == 0;
01145 
01146                         if(first || last)
01147                         {
01148                                 int i = 0, blocknumber = 0;
01149 
01150                                 // get the list of blocks
01151                                 std::list<const dtn::data::Block*> blocklist = bundle.getBlocks();
01152 
01153                                 // iterate through all blocks
01154                                 for (std::list<const dtn::data::Block* >::const_iterator it = blocklist.begin(); it != blocklist.end(); it++, i++)
01155                                 {
01156                                         // generate a temporary block file
01157                                         ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01158 
01159                                         ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary);
01160 
01161                                         dtn::data::SeparateSerializer serializer(datei);
01162                                         serializer << (*(*it));
01163                                         datei.close();
01164 
01165                                         /* Schreibt Blöcke in die DB.
01166                                          * Die Blocknummern werden nach folgendem Schema in die DB geschrieben:
01167                                          *              Die Blöcke des ersten Fragments fangen bei -x an und gehen bis -1
01168                                          *              Die Blöcke des letzten Fragments beginnen bei 1 und gehen bis x
01169                                          *              Der Payloadblock wird an Stelle 0 eingefügt.
01170                                          */
01171                                         sqlite3_bind_text(_statements[BLOCK_STORE], 1, bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT);
01172                                         sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType()));
01173                                         sqlite3_bind_text(_statements[BLOCK_STORE], 3, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT);
01174 
01175                                         blocknumber = blocklist.size() - i;
01176 
01177                                         if (first)
01178                                         {
01179                                                 blocknumber = (blocklist.size() - i)*(-1);
01180                                         }
01181 
01182                                         sqlite3_bind_int(_statements[BLOCK_STORE],4,blocknumber);
01183                                         executeQuery(_statements[BLOCK_STORE]);
01184                                 }
01185                         }
01186 
01187                         //At first the database is checked for already received bundles and some
01188                         //informations are read, which effect the following program flow.
01189                         sqlite3_bind_text(_statements[BUNDLE_GET_FRAGMENT],1,sourceEID.c_str(),sourceEID.length(),SQLITE_TRANSIENT);
01190                         sqlite3_bind_int64(_statements[BUNDLE_GET_FRAGMENT],2,bundle._timestamp);
01191                         sqlite3_bind_int(_statements[BUNDLE_GET_FRAGMENT],3,bundle._sequencenumber);
01192                         err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]);
01193 
01194                         //Es ist noch kein Eintrag vorhanden: generiere Payloadfilename
01195                         if(err == SQLITE_DONE)
01196                         {
01197                                 // generate a temporary block file
01198                                 ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01199                                 payloadfile = tmpfile;
01200                         }
01201                         //Es ist bereits ein eintrag vorhanden: speicher den payloadfilename
01202                         else if (err == SQLITE_ROW)
01203                         {
01204                                 payloadfile = ibrcommon::File( (const char*)sqlite3_column_text(_statements[BUNDLE_GET_FRAGMENT], 14) );                                //14 = Payloadfilename
01205                         }
01206 
01207                         while (err == SQLITE_ROW){
01208                                 //The following codepice summs the payloadfragmentbytes, to check if all fragments were received
01209                                 //and the bundle is complete.
01210                                 fragmentoffset = sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],9);                                                                // 9 = fragmentoffset
01211 
01212                                 if(bitcounter >= fragmentoffset){
01213                                         bitcounter += fragmentoffset - bitcounter + sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],13);    //13 = payloadlength
01214                                 }
01215                                 else if(bitcounter >= bundle._fragmentoffset){
01216                                         bitcounter += bundle._fragmentoffset - bitcounter + payloadsize;
01217                                 }
01218                                 else{
01219                                         allFragmentsReceived = false;
01220                                 }
01221                                 err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]);
01222                         }
01223 
01224                         if(bitcounter +1 >= bundle._fragmentoffset && allFragmentsReceived) //Wenn das aktuelle fragment das größte ist, ist die Liste durchlaufen und das Frag nicht dazugezählt. Dies wird hier nachgeholt.
01225                         {
01226                                 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize;
01227                         }
01228 
01229                         sqlite3_reset(_statements[BUNDLE_GET_FRAGMENT]);
01230                         if(err != SQLITE_DONE){
01231                                 stringstream error;
01232                                 error << "SQLiteBundleStorage: storeFragment: " << err << " errmsg: " << sqlite3_errmsg(_database);
01233                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01234                                 throw SQLiteQueryException(error.str());
01235                         }
01236 
01237                         //Save the Payload in a separate file
01238                         std::fstream datei(payloadfile.getPath().c_str(), std::ios::in | std::ios::out | std::ios::binary);
01239                         datei.seekp(bundle._fragmentoffset);
01240                         {
01241                                 ibrcommon::BLOB::iostream io = payloadBlob.iostream();
01242 
01243                                 size_t ret = 0;
01244                                 istream &s = (*io);
01245                                 s.seekg(0);
01246 
01247                                 while (true)
01248                                 {
01249                                         char buf;
01250                                         s.get(buf);
01251                                         if(!s.eof()){
01252                                                 datei.put(buf);
01253                                                 ret++;
01254                                         }
01255                                         else
01256                                                 break;
01257                                 }
01258                                 datei.close();
01259                         }
01260 
01261                         //All fragments received
01262                         if(bundle._appdatalength == bitcounter)
01263                         {
01264                                 /*
01265                                  * 1. Payloadblock zusammensetzen
01266                                  * 2. PayloadBlock in der BlockTable sichern
01267                                  * 3. Primary BlockInfos in die BundleTable eintragen.
01268                                  * 4. Einträge der Fragmenttabelle löschen
01269                                  */
01270 
01271                                 // 1. Get the payloadblockobject
01272                                 {
01273                                         ibrcommon::BLOB::Reference ref(ibrcommon::FileBLOB::create(payloadfile));
01274                                         dtn::data::PayloadBlock pb(ref);
01275 
01276                                         //Copy EID list
01277                                         if(pb.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS))
01278                                         {
01279                                                 std::list<dtn::data::EID> eidlist;
01280                                                 std::list<dtn::data::EID>::iterator eidIt;
01281                                                 eidlist = block.getEIDList();
01282 
01283                                                 for(eidIt = eidlist.begin(); eidIt != eidlist.end(); eidIt++)
01284                                                 {
01285                                                         pb.addEID(*eidIt);
01286                                                 }
01287                                         }
01288 
01289                                         //Copy ProcFlags
01290                                         pb.set(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT, block.get(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT));
01291                                         pb.set(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED));
01292                                         pb.set(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED));
01293                                         pb.set(dtn::data::PayloadBlock::LAST_BLOCK, block.get(dtn::data::PayloadBlock::LAST_BLOCK));
01294                                         pb.set(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED));
01295                                         pb.set(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED, block.get(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED));
01296                                         pb.set(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS, block.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS));
01297 
01298                                         //2. Save the PayloadBlock to a file and store the metainformation in the BlockTable
01299                                         ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01300 
01301                                         std::ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary);
01302                                         dtn::data::SeparateSerializer serializer(datei);
01303                                         serializer << (pb);
01304                                         datei.close();
01305 
01306                                         filename = tmpfile;
01307                                 }
01308 
01309                                 sqlite3_bind_text(_statements[BLOCK_STORE], 1,bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT);
01310 //                              sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType()));
01311                                 sqlite3_bind_text(_statements[BLOCK_STORE], 3, filename.getPath().c_str(), filename.getPath().length(), SQLITE_TRANSIENT);
01312                                 sqlite3_bind_int(_statements[BLOCK_STORE], 4, 0);
01313                                 executeQuery(_statements[BLOCK_STORE]);
01314 
01315                                 //3. Delete reassembled Payload data
01316                                 payloadfile.remove();
01317 
01318                                 //4. Remove Fragment entries from BundleTable
01319                                 sqlite3_bind_text(_statements[BUNDLE_DELETE],1,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01320                                 sqlite3_bind_int64(_statements[BUNDLE_DELETE],2,bundle._sequencenumber);
01321                                 sqlite3_bind_int64(_statements[BUNDLE_DELETE],3,bundle._timestamp);
01322                                 executeQuery(_statements[BUNDLE_DELETE]);
01323 
01324                                 //5. Write the Primary Block to the BundleTable
01325                                 size_t procFlags = bundle._procflags;
01326                                 procFlags &= ~(dtn::data::PrimaryBlock::FRAGMENT);
01327 
01328                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01329                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01330                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
01331                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT);
01332                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT);
01333                                 sqlite3_bind_int(_statements[BUNDLE_STORE],6,procFlags);
01334                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp);
01335                                 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber);
01336                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime);
01337                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset);
01338                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength);
01339                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL);
01340                                 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL);
01341                                 sqlite3_bind_int(_statements[BUNDLE_STORE],14,NULL);
01342                                 sqlite3_bind_text(_statements[BUNDLE_STORE],15,NULL,0,SQLITE_TRANSIENT);
01343                                 executeQuery(_statements[BUNDLE_STORE]);
01344                         }
01345 
01346                         //There are some fragments missing
01347                         else{
01348                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01349                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01350                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
01351                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT);
01352                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT);
01353                                 sqlite3_bind_int(_statements[BUNDLE_STORE],6,bundle._procflags);
01354                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp);
01355                                 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber);
01356                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime);
01357                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset);
01358                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength);
01359                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL);
01360                                 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL);
01361                                 sqlite3_bind_int(_statements[BUNDLE_STORE],14,payloadsize);
01362                                 sqlite3_bind_text(_statements[BUNDLE_STORE],15,payloadfile.getPath().c_str(),payloadfile.getPath().length(),SQLITE_TRANSIENT);
01363                                 executeQuery(_statements[BUNDLE_STORE]);
01364                         }
01365                 }
01366 #endif
01367 
01368                 void SQLiteBundleStorage::raiseEvent(const Event *evt)
01369                 {
01370                         try {
01371                                 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt);
01372 
01373                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
01374                                 {
01375                                         _tasks.push(new TaskExpire(time.getTimestamp()));
01376                                 }
01377                         } catch (const std::bad_cast&) { }
01378                         
01379                         try {
01380                                 const GlobalEvent &global = dynamic_cast<const GlobalEvent&>(*evt);
01381 
01382                                 if(global.getAction() == GlobalEvent::GLOBAL_IDLE)
01383                                 {
01384                                         // switch to idle mode
01385                                         ibrcommon::MutexLock l(TaskIdle::_mutex);
01386                                         TaskIdle::_idle = true;
01387 
01388                                         // generate an idle task
01389                                         _tasks.push(new TaskIdle());
01390                                 }
01391                                 else if(global.getAction() == GlobalEvent::GLOBAL_BUSY)
01392                                 {
01393                                         // switch back to non-idle mode
01394                                         ibrcommon::MutexLock l(TaskIdle::_mutex);
01395                                         TaskIdle::_idle = false;
01396                                 }
01397                         } catch (const std::bad_cast&) { }
01398                 }
01399 
01400                 void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage)
01401                 {
01402                         /*
01403                          * Only if the actual time is bigger or equal than the time when the next bundle expires, deleteexpired is called.
01404                          */
01405                         size_t exp_time = storage.get_expire_time();
01406                         if ((_timestamp < exp_time) || (exp_time == 0)) return;
01407 
01408                         /*
01409                          * Performanceverbesserung: Damit die Abfragen nicht jede Sekunde ausgeführt werden müssen, speichert man den Zeitpunkt an dem
01410                          * das nächste Bündel gelöscht werden soll in eine Variable und fürhrt deleteexpired erst dann aus wenn ein Bündel abgelaufen ist.
01411                          * Nach dem Löschen wird die DB durchsucht und der nächste Ablaufzeitpunkt wird in die Variable gesetzt.
01412                          */
01413 
01414                         {
01415                                 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_FILENAMES], storage._statements[EXPIRE_BUNDLE_FILENAMES]);
01416 
01417                                 // query for blocks of expired bundles
01418                                 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_FILENAMES], 1, _timestamp);
01419                                 while (sqlite3_step(storage._statements[EXPIRE_BUNDLE_FILENAMES]) == SQLITE_ROW)
01420                                 {
01421                                         ibrcommon::File block((const char*)sqlite3_column_text(storage._statements[EXPIRE_BUNDLE_FILENAMES],0));
01422                                         block.remove();
01423                                 }
01424                         }
01425 
01426                         {
01427                                 AutoResetLock l(storage._locks[EXPIRE_BUNDLES], storage._statements[EXPIRE_BUNDLES]);
01428 
01429                                 // query expired bundles
01430                                 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLES], 1, _timestamp);
01431                                 while (sqlite3_step(storage._statements[EXPIRE_BUNDLES]) == SQLITE_ROW)
01432                                 {
01433                                         dtn::data::BundleID id;
01434                                         storage.get_bundleid(storage._statements[EXPIRE_BUNDLES], id);
01435                                         dtn::core::BundleExpiredEvent::raise(id);
01436                                 }
01437                         }
01438 
01439                         {
01440                                 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_DELETE], storage._statements[EXPIRE_BUNDLE_DELETE]);
01441 
01442                                 // delete all expired db entries (bundles and blocks)
01443                                 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_DELETE], 1, _timestamp);
01444                                 sqlite3_step(storage._statements[EXPIRE_BUNDLE_DELETE]);
01445                         }
01446 
01447                         //update deprecated timer
01448                         storage.update_expire_time();
01449                 }
01450 
01451                 void SQLiteBundleStorage::update_expire_time()
01452                 {
01453                         ibrcommon::MutexLock l(_locks[EXPIRE_NEXT_TIMESTAMP]);
01454                         int err = sqlite3_step(_statements[EXPIRE_NEXT_TIMESTAMP]);
01455 
01456                         if (err == SQLITE_ROW)
01457                         {
01458                                 ibrcommon::MutexLock l(_next_expiration_lock);
01459                                 _next_expiration = sqlite3_column_int64(_statements[EXPIRE_NEXT_TIMESTAMP], 0);
01460                         }
01461                         else
01462                         {
01463                                 ibrcommon::MutexLock l(_next_expiration_lock);
01464                                 _next_expiration = 0;
01465                         }
01466 
01467                         sqlite3_reset(_statements[EXPIRE_NEXT_TIMESTAMP]);
01468                 }
01469 
01470                 void SQLiteBundleStorage::update_custodian(const dtn::data::BundleID &id, const dtn::data::EID &custodian)
01471                 {
01472                         // select query with or without fragmentation extension
01473                         STORAGE_STMT query = BUNDLE_UPDATE_CUSTODIAN;
01474                         if (id.fragment)        query = FRAGMENT_UPDATE_CUSTODIAN;
01475 
01476 
01477                         AutoResetLock l(_locks[query], _statements[query]);
01478 
01479                         sqlite3_bind_text(_statements[query], 1, custodian.getString().c_str(), custodian.getString().length(), SQLITE_TRANSIENT);
01480                         set_bundleid(_statements[query], id, 1);
01481 
01482                         // update the custodian in the database
01483                         int err = sqlite3_step(_statements[query]);
01484 
01485                         if (err != SQLITE_DONE)
01486                         {
01487                                 stringstream error;
01488                                 error << "SQLiteBundleStorage: update_custodian() failure: " << err << " " <<  sqlite3_errmsg(_database);
01489                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01490                         }
01491                 }
01492 
01493                 int SQLiteBundleStorage::store_blocks(const data::Bundle &bundle)
01494                 {
01495                         int blocktyp, blocknumber(1), storedBytes(0);
01496 
01497                         // get all blocks of the bundle
01498                         const list<const data::Block*> blocklist = bundle.getBlocks();
01499 
01500                         // generate a bundle id
01501                         data::BundleID id(bundle);
01502 
01503                         for(std::list<const data::Block*>::const_iterator it = blocklist.begin() ;it != blocklist.end(); it++)
01504                         {
01505                                 blocktyp = (int)(*it)->getType();
01506 
01507                                 ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01508                                 std::ofstream filestream(tmpfile.getPath().c_str(), std::ios_base::out | std::ios::binary);
01509 
01510                                 dtn::data::SeparateSerializer serializer(filestream);
01511                                 serializer << (*(*it));
01512                                 storedBytes += serializer.getLength(*(*it));
01513 
01514                                 filestream.close();
01515 
01516                                 // protect this query from concurrent access and enable the auto-reset feature
01517                                 AutoResetLock l(_locks[BLOCK_STORE], _statements[BLOCK_STORE]);
01518 
01519                                 // set bundle key data
01520                                 set_bundleid(_statements[BLOCK_STORE], id);
01521 
01522                                 // set the column four to null if this is not a fragment
01523                                 if (!id.fragment) sqlite3_bind_null(_statements[BLOCK_STORE], 4);
01524 
01525                                 // set the block type
01526                                 sqlite3_bind_int(_statements[BLOCK_STORE], 5, blocktyp);
01527 
01528                                 // the filename of the block data
01529                                 sqlite3_bind_text(_statements[BLOCK_STORE], 6, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT);
01530 
01531                                 // the ordering number
01532                                 sqlite3_bind_int(_statements[BLOCK_STORE], 7, blocknumber);
01533 
01534                                 // execute the query and store the block in the database
01535                                 if (sqlite3_step(_statements[BLOCK_STORE]) != SQLITE_DONE)
01536                                 {
01537                                         throw SQLiteQueryException("can not store block of bundle");
01538                                 }
01539 
01540                                 //increment blocknumber
01541                                 blocknumber++;
01542                         }
01543 
01544                         return storedBytes;
01545                 }
01546 
01547                 void SQLiteBundleStorage::get_blocks(dtn::data::Bundle &bundle, const dtn::data::BundleID &id)
01548                 {
01549                         int err = 0;
01550                         string file;
01551 
01552                         // select the right statement to use
01553                         const size_t stmt_key = id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID;
01554 
01555                         AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
01556 
01557                         // set the bundle key values
01558                         set_bundleid(_statements[stmt_key], id);
01559 
01560                         // query the database and step through all blocks
01561                         while ((err = sqlite3_step(_statements[stmt_key])) == SQLITE_ROW)
01562                         {
01563                                 const ibrcommon::File f( (const char*) sqlite3_column_text(_statements[stmt_key], 0) );
01564 
01565                                 // open the file
01566                                 std::ifstream is(f.getPath().c_str(), std::ios::binary | std::ios::in);
01567 
01568                                 // read the block
01569                                 dtn::data::Block &block = dtn::data::SeparateDeserializer(is, bundle).readBlock();
01570 
01571                                 // close the file
01572                                 is.close();
01573 
01574                                 // modify the age block if present
01575                                 try {
01576                                         dtn::data::AgeBlock &agebl = dynamic_cast<dtn::data::AgeBlock&>(block);
01577 
01578                                         // modify the AgeBlock with the age of the file
01579                                         time_t age = f.lastaccess() - f.lastmodify();
01580 
01581                                         agebl.addSeconds(age);
01582                                 } catch (const std::bad_cast&) { };
01583                         }
01584 
01585                         if (err == SQLITE_DONE)
01586                         {
01587                                 if (bundle.getBlocks().size() == 0)
01588                                 {
01589                                         IBRCOMMON_LOGGER(error) << "get_blocks: no blocks found for: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
01590                                         throw SQLiteQueryException("no blocks found");
01591                                 }
01592                         }
01593                         else
01594                         {
01595                                 IBRCOMMON_LOGGER(error) << "get_blocks() failure: "<< err << " " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
01596                                 throw SQLiteQueryException("can not query for blocks");
01597                         }
01598                 }
01599 
01600                 void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage)
01601                 {
01602                         // until IDLE is false
01603                         while (true)
01604                         {
01605                                 /*
01606                                  * When an object (table, index, trigger, or view) is dropped from the database, it leaves behind empty space.
01607                                  * This empty space will be reused the next time new information is added to the database. But in the meantime,
01608                                  * the database file might be larger than strictly necessary. Also, frequent inserts, updates, and deletes can
01609                                  * cause the information in the database to become fragmented - scrattered out all across the database file rather
01610                                  * than clustered together in one place.
01611                                  * The VACUUM command cleans the main database. This eliminates free pages, aligns table data to be contiguous,
01612                                  * and otherwise cleans up the database file structure.
01613                                  */
01614                                 {
01615                                         AutoResetLock l(storage._locks[VACUUM], storage._statements[VACUUM]);
01616                                         sqlite3_step(storage._statements[VACUUM]);
01617                                 }
01618 
01619                                 // here we can do some IDLE stuff...
01620                                 ::sleep(1);
01621 
01622                                 ibrcommon::MutexLock l(TaskIdle::_mutex);
01623                                 if (!TaskIdle::_idle) return;
01624                         }
01625                 }
01626 
01627                 const std::string SQLiteBundleStorage::getName() const
01628                 {
01629                         return "SQLiteBundleStorage";
01630                 }
01631 
01632                 void SQLiteBundleStorage::releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id)
01633                 {
01634                         // custody is successful transferred to another node.
01635                         // it is safe to delete this bundle now. (depending on the routing algorithm.)
01636                         // update the custodian of this bundle with the new one
01637                         update_custodian(id, custodian);
01638                 }
01639 
01640                 void SQLiteBundleStorage::new_expire_time(size_t ttl)
01641                 {
01642                         ibrcommon::MutexLock l(_next_expiration_lock);
01643                         if (_next_expiration == 0 || ttl < _next_expiration)
01644                         {
01645                                 _next_expiration = ttl;
01646                         }
01647                 }
01648 
01649                 void SQLiteBundleStorage::reset_expire_time()
01650                 {
01651                         ibrcommon::MutexLock l(_next_expiration_lock);
01652                         _next_expiration = 0;
01653                 }
01654 
01655                 size_t SQLiteBundleStorage::get_expire_time()
01656                 {
01657                         ibrcommon::MutexLock l(_next_expiration_lock);
01658                         return _next_expiration;
01659                 }
01660 
01661                 void SQLiteBundleStorage::add_deletion(const dtn::data::BundleID &id)
01662                 {
01663                         ibrcommon::MutexLock l(_deletion_mutex);
01664                         _deletion_list.insert(id);
01665                 }
01666 
01667                 void SQLiteBundleStorage::remove_deletion(const dtn::data::BundleID &id)
01668                 {
01669                         ibrcommon::MutexLock l(_deletion_mutex);
01670                         _deletion_list.erase(id);
01671                 }
01672 
01673                 bool SQLiteBundleStorage::contains_deletion(const dtn::data::BundleID &id)
01674                 {
01675                         ibrcommon::MutexLock l(_deletion_mutex);
01676                         return (_deletion_list.find(id) != _deletion_list.end());
01677                 }
01678 
01679                 void SQLiteBundleStorage::trylock() throw (ibrcommon::MutexException)
01680                 {
01681                         // not implemented
01682                         throw ibrcommon::MutexException();
01683                 }
01684 
01685                 void SQLiteBundleStorage::enter() throw (ibrcommon::MutexException)
01686                 {
01687                         // lock all statements
01688                         for (int i = 0; i < SQL_QUERIES_END; i++)
01689                         {
01690                                 // lock the statement
01691                                 _locks[i].enter();
01692                         }
01693                 }
01694 
01695                 void SQLiteBundleStorage::leave() throw (ibrcommon::MutexException)
01696                 {
01697                         // unlock all statements
01698                         for (int i = 0; i < SQL_QUERIES_END; i++)
01699                         {
01700                                 // unlock the statement
01701                                 _locks[i].leave();
01702                         }
01703                 }
01704 
01705                 void SQLiteBundleStorage::set_bundleid(sqlite3_stmt *st, const dtn::data::BundleID &id, size_t offset) const
01706                 {
01707                         const std::string source_id = id.source.getString();
01708                         sqlite3_bind_text(st, offset + 1, source_id.c_str(), source_id.length(), SQLITE_TRANSIENT);
01709                         sqlite3_bind_int64(st, offset + 2, id.timestamp);
01710                         sqlite3_bind_int64(st, offset + 3, id.sequencenumber);
01711 
01712                         if (id.fragment)
01713                         {
01714                                 sqlite3_bind_int64(st, offset + 4, id.offset);
01715                         }
01716                 }
01717 
01718                 void SQLiteBundleStorage::get_bundleid(sqlite3_stmt *st, dtn::data::BundleID &id, size_t offset) const
01719                 {
01720                         id.source = dtn::data::EID((const char*)sqlite3_column_text(st, offset + 0));
01721                         id.timestamp = sqlite3_column_int64(st, offset + 1);
01722                         id.sequencenumber = sqlite3_column_int64(st, offset + 2);
01723 
01724                         id.fragment = (sqlite3_column_text(st, offset + 3) != NULL);
01725                         id.offset = sqlite3_column_int64(st, offset + 3);
01726                 }
01727         }
01728 }