IBR-DTNSuite 0.6

daemon/src/core/SQLiteBundleStorage.cpp

Go to the documentation of this file.
00001 /*
00002  * SQLiteBundleStorage.cpp
00003  *
00004  *  Created on: 09.01.2010
00005  *      Author: Myrtus
00006  */
00007 
00008 
00009 #include "core/SQLiteBundleStorage.h"
00010 #include "core/TimeEvent.h"
00011 #include "core/GlobalEvent.h"
00012 #include "core/BundleCore.h"
00013 #include "core/SQLiteConfigure.h"
00014 #include "core/BundleEvent.h"
00015 #include "core/BundleExpiredEvent.h"
00016 
00017 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00018 #include <ibrdtn/data/PayloadBlock.h>
00019 #include <ibrdtn/data/AgeBlock.h>
00020 #include <ibrdtn/data/Serializer.h>
00021 #include <ibrdtn/data/Bundle.h>
00022 #include <ibrdtn/data/BundleID.h>
00023 #include <ibrdtn/utils/Clock.h>
00024 
00025 #include <ibrcommon/thread/MutexLock.h>
00026 #include <ibrcommon/data/BLOB.h>
00027 #include <ibrcommon/AutoDelete.h>
00028 #include <ibrcommon/Logger.h>
00029 
00030 namespace dtn
00031 {
00032         namespace core
00033         {
00034                 void sql_tracer(void*, const char * pQuery)
00035                 {
00036                         IBRCOMMON_LOGGER_DEBUG(50) << "sqlite trace: " << pQuery << IBRCOMMON_LOGGER_ENDL;
00037                 }
00038 
00039                 const std::string SQLiteBundleStorage::_tables[SQL_TABLE_END] =
00040                                 { "bundles", "blocks", "routing", "routing_bundles", "routing_nodes" };
00041 
00042                 const std::string SQLiteBundleStorage::_sql_queries[SQL_QUERIES_END] =
00043                 {
00044                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM " + _tables[SQL_TABLE_BUNDLE] + " ORDER BY priority DESC LIMIT ?,?;",
00045                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL LIMIT 1;",
00046                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? LIMIT 1;",
00047                         "SELECT * FROM " + _tables[SQL_TABLE_BUNDLE] + " WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset != NULL ORDER BY fragmentoffset ASC;",
00048 
00049                         "SELECT source, timestamp, sequencenumber, fragmentoffset, procflags FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
00050                         "SELECT filename FROM "+ _tables[SQL_TABLE_BUNDLE] +" as a, "+ _tables[SQL_TABLE_BLOCK] +" as b WHERE a.source_id = b.source_id AND a.timestamp = b.timestamp AND a.sequencenumber = b.sequencenumber AND ((a.fragmentoffset = b.fragmentoffset) OR ((a.fragmentoffset IS NULL) AND (b.fragmentoffset IS NULL))) AND a.expiretime <= ?;",
00051                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE expiretime <= ?;",
00052                         "SELECT expiretime FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY expiretime ASC LIMIT 1;",
00053 
00054                         "SELECT ROWID FROM "+ _tables[SQL_TABLE_BUNDLE] +" LIMIT 1;",
00055                         "SELECT COUNT(ROWID) FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
00056 
00057                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;",
00058                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00059                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +";",
00060                         "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +" (source_id, timestamp, sequencenumber, fragmentoffset, source, destination, reportto, custodian, procflags, lifetime, appdatalength, expiretime, priority, hopcount) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?);",
00061                         "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL;",
00062                         "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET custodian = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00063 
00064                         "UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET procflags = ? WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ?;",
00065 
00066                         "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL ORDER BY ordernumber ASC;",
00067                         "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? ORDER BY ordernumber ASC;",
00068                         "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset IS NULL AND ordernumber = ?;",
00069                         "SELECT filename, blocktype FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE source_id = ? AND timestamp = ? AND sequencenumber = ? AND fragmentoffset = ? AND ordernumber = ?;",
00070                         "DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +";",
00071                         "INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +" (source_id, timestamp, sequencenumber, fragmentoffset, blocktype, filename, ordernumber) VALUES (?,?,?,?,?,?,?);",
00072 
00073 #ifdef SQLITE_STORAGE_EXTENDED
00074                         "SELECT Routing FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;",
00075                         "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;",
00076                         "DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +";",
00077                         "INSERT INTO "+ _tables[SQL_TABLE_ROUTING] +" (Key, Routing) VALUES (?,?);",
00078 
00079                         "SELECT Routing FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;",
00080                         "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;",
00081                         "DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +";",
00082                         "INSERT INTO "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (EID, Key, Routing) VALUES (?,?,?);",
00083 
00084                         "SELECT Routing FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;",
00085                         "DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;",
00086                         "INSERT INTO "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID, Key, Routing) VALUES (?,?,?);",
00087 #endif
00088 
00089                         "VACUUM;"
00090                 };
00091 
00092                 const std::string SQLiteBundleStorage::_db_structure[10] =
00093                 {
00094                         "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BLOCK] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `blocktype` INTEGER NOT NULL, `filename` TEXT NOT NULL, `ordernumber` INTEGER NOT NULL);",
00095                         "CREATE TABLE IF NOT EXISTS `" + _tables[SQL_TABLE_BUNDLE] + "` ( `key` INTEGER PRIMARY KEY ASC, `source_id` TEXT NOT NULL, `source` TEXT NOT NULL, `destination` TEXT NOT NULL, `reportto` TEXT NOT NULL, `custodian` TEXT NOT NULL, `procflags` INTEGER NOT NULL, `timestamp` INTEGER NOT NULL, `sequencenumber` INTEGER NOT NULL, `lifetime` INTEGER NOT NULL, `fragmentoffset` INTEGER DEFAULT NULL, `appdatalength` INTEGER DEFAULT NULL, `expiretime` INTEGER NOT NULL, `priority` INTEGER NOT NULL, `hopcount` INTEGER DEFAULT NULL);",
00096                         "create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);",
00097                         "create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);",
00098                         "create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);",
00099                         "CREATE TRIGGER IF NOT EXISTS blocks_autodelete AFTER DELETE ON " + _tables[SQL_TABLE_BUNDLE] + " FOR EACH ROW BEGIN DELETE FROM " + _tables[SQL_TABLE_BLOCK] + " WHERE " + _tables[SQL_TABLE_BLOCK] + ".source_id = OLD.source_id AND " + _tables[SQL_TABLE_BLOCK] + ".timestamp = OLD.timestamp AND " + _tables[SQL_TABLE_BLOCK] + ".sequencenumber = OLD.sequencenumber AND ((" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset IS NULL AND old.fragmentoffset IS NULL) OR (" + _tables[SQL_TABLE_BLOCK] + ".fragmentoffset = old.fragmentoffset)); END;",
00100                         "CREATE UNIQUE INDEX IF NOT EXISTS blocks_bid ON " + _tables[SQL_TABLE_BLOCK] + " (source_id, timestamp, sequencenumber, fragmentoffset);",
00101                         "CREATE INDEX IF NOT EXISTS bundles_destination ON " + _tables[SQL_TABLE_BUNDLE] + " (destination);",
00102                         "CREATE INDEX IF NOT EXISTS bundles_destination_priority ON " + _tables[SQL_TABLE_BUNDLE] + " (destination, priority);",
00103                         "CREATE UNIQUE INDEX IF NOT EXISTS bundles_id ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset);"
00104                         "CREATE INDEX IF NOT EXISTS bundles_expire ON " + _tables[SQL_TABLE_BUNDLE] + " (source_id, timestamp, sequencenumber, fragmentoffset, expiretime);"
00105                 };
00106 
00107                 ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex;
00108                 bool SQLiteBundleStorage::TaskIdle::_idle = false;
00109 
00110                 SQLiteBundleStorage::SQLBundleQuery::SQLBundleQuery()
00111                  : _statement(NULL)
00112                 { }
00113 
00114                 SQLiteBundleStorage::SQLBundleQuery::~SQLBundleQuery()
00115                 {
00116                         // free the statement if used before
00117                         if (_statement != NULL)
00118                         {
00119                                 sqlite3_finalize(_statement);
00120                         }
00121                 }
00122 
00123                 SQLiteBundleStorage::AutoResetLock::AutoResetLock(ibrcommon::Mutex &mutex, sqlite3_stmt *st)
00124                  : _lock(mutex), _st(st)
00125                 {
00126                         //sqlite3_clear_bindings(st);
00127                 }
00128 
00129                 SQLiteBundleStorage::AutoResetLock::~AutoResetLock()
00130                 {
00131                         sqlite3_reset(_st);
00132                 }
00133 
00134                 SQLiteBundleStorage::SQLiteBLOB::SQLiteBLOB(const ibrcommon::File &path)
00135                  : _blobPath(path)
00136                 {
00137                         // generate a new temporary file
00138                         _file = ibrcommon::TemporaryFile(_blobPath, "blob");
00139                 }
00140 
00141                 SQLiteBundleStorage::SQLiteBLOB::~SQLiteBLOB()
00142                 {
00143                         // delete the file if the last reference is destroyed
00144                         _file.remove();
00145                 }
00146 
00147                 void SQLiteBundleStorage::SQLiteBLOB::clear()
00148                 {
00149                         // close the file
00150                         _filestream.close();
00151 
00152                         // remove the old file
00153                         _file.remove();
00154 
00155                         // generate a new temporary file
00156                         _file = ibrcommon::TemporaryFile(_blobPath, "blob");
00157 
00158                         // open temporary file
00159                         _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::trunc | ios::binary );
00160 
00161                         if (!_filestream.is_open())
00162                         {
00163                                 IBRCOMMON_LOGGER(error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
00164                                 throw ibrcommon::CanNotOpenFileException(_file);
00165                         }
00166                 }
00167 
00168                 void SQLiteBundleStorage::SQLiteBLOB::open()
00169                 {
00170                         ibrcommon::BLOB::_filelimit.wait();
00171 
00172                         // open temporary file
00173                         _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::binary );
00174 
00175                         if (!_filestream.is_open())
00176                         {
00177                                 IBRCOMMON_LOGGER(error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
00178                                 throw ibrcommon::CanNotOpenFileException(_file);
00179                         }
00180                 }
00181 
00182                 void SQLiteBundleStorage::SQLiteBLOB::close()
00183                 {
00184                         // flush the filestream
00185                         _filestream.flush();
00186 
00187                         // close the file
00188                         _filestream.close();
00189 
00190                         ibrcommon::BLOB::_filelimit.post();
00191                 }
00192 
00193                 size_t SQLiteBundleStorage::SQLiteBLOB::__get_size()
00194                 {
00195                         return _file.size();
00196                 }
00197 
00198                 ibrcommon::BLOB::Reference SQLiteBundleStorage::create()
00199                 {
00200                         return ibrcommon::BLOB::Reference(new SQLiteBLOB(_blobPath));
00201                 }
00202 
00203                 SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const size_t &size)
00204                  : dbPath(path), dbFile(path.get("sqlite.db")), dbSize(size), _next_expiration(0)
00205                 {
00206                         //Configure SQLite Library
00207                         SQLiteConfigure::configure();
00208 
00209                         // check if SQLite is thread-safe
00210                         if (sqlite3_threadsafe() == 0)
00211                         {
00212                                 IBRCOMMON_LOGGER(critical) << "sqlite library has not compiled with threading support." << IBRCOMMON_LOGGER_ENDL;
00213                                 throw ibrcommon::Exception("need threading support in sqlite!");
00214                         }
00215                 }
00216 
00217                 SQLiteBundleStorage::~SQLiteBundleStorage()
00218                 {
00219                         stop();
00220                         join();
00221 
00222                         ibrcommon::MutexLock l(*this);
00223 
00224                         // free all statements
00225                         for (int i = 0; i < SQL_QUERIES_END; i++)
00226                         {
00227                                 // prepare the statement
00228                                 sqlite3_finalize(_statements[i]);
00229                         }
00230 
00231                         //close Databaseconnection
00232                         if (sqlite3_close(_database) != SQLITE_OK)
00233                         {
00234                                 IBRCOMMON_LOGGER(error) << "unable to close database" << IBRCOMMON_LOGGER_ENDL;
00235                         }
00236 
00237                         // shutdown sqlite library
00238                         SQLiteConfigure::shutdown();
00239                 }
00240 
00241                 void SQLiteBundleStorage::openDatabase(const ibrcommon::File &path)
00242                 {
00243                         ibrcommon::MutexLock l(*this);
00244 
00245                         // set the block path
00246                         _blockPath = dbPath.get(_tables[SQL_TABLE_BLOCK]);
00247                         _blobPath = dbPath.get("blob");
00248 
00249                         //open the database
00250                         if (sqlite3_open_v2(path.getPath().c_str(), &_database,  SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL))
00251                         {
00252                                 IBRCOMMON_LOGGER(error) << "Can't open database: " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
00253                                 sqlite3_close(_database);
00254                                 throw ibrcommon::Exception("Unable to open sqlite database");
00255                         }
00256 
00257                         int err = 0;
00258 
00259                         // create all tables
00260                         for (size_t i = 0; i < 10; i++)
00261                         {
00262                                 sqlite3_stmt *st = prepare(_db_structure[i]);
00263                                 err = sqlite3_step(st);
00264                                 if(err != SQLITE_DONE)
00265                                 {
00266                                         IBRCOMMON_LOGGER(error) << "SQLiteBundleStorage: failed to create database" << err << IBRCOMMON_LOGGER_ENDL;
00267                                 }
00268                                 sqlite3_reset(st);
00269                                 sqlite3_finalize(st);
00270                         }
00271 
00272                         // delete all old BLOB container
00273                         _blobPath.remove(true);
00274 
00275                         // create BLOB folder
00276                         ibrcommon::File::createDirectory( _blobPath );
00277 
00278                         // create the bundle folder
00279                         ibrcommon::File::createDirectory( _blockPath );
00280 
00281                         // prepare all statements
00282                         for (int i = 0; i < SQL_QUERIES_END; i++)
00283                         {
00284                                 // prepare the statement
00285                                 int err = sqlite3_prepare_v2(_database, _sql_queries[i].c_str(), _sql_queries[i].length(), &_statements[i], 0);
00286                                 if ( err != SQLITE_OK )
00287                                 {
00288                                         IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failed to prepare statement: " << err << " with query: " << _sql_queries[i] << IBRCOMMON_LOGGER_ENDL;
00289                                 }
00290                         }
00291 
00292                         // disable synchronous mode
00293                         sqlite3_exec(_database, "PRAGMA synchronous = OFF;", NULL, NULL, NULL);
00294 
00295                         // enable sqlite tracing if debug level is higher than 50
00296                         if (IBRCOMMON_LOGGER_LEVEL >= 50)
00297                         {
00298                                 sqlite3_trace(_database, &sql_tracer, NULL);
00299                         }
00300                 }
00301 
00302                 void SQLiteBundleStorage::componentRun()
00303                 {
00304                         // loop until aborted
00305                         try {
00306                                 while (true)
00307                                 {
00308                                         Task *t = _tasks.getnpop(true);
00309 
00310                                         try {
00311                                                 BlockingTask &btask = dynamic_cast<BlockingTask&>(*t);
00312                                                 try {
00313                                                         btask.run(*this);
00314                                                 } catch (const std::exception&) {
00315                                                         btask.abort();
00316                                                         continue;
00317                                                 };
00318                                                 btask.done();
00319                                                 continue;
00320                                         } catch (const std::bad_cast&) { };
00321 
00322                                         try {
00323                                                 ibrcommon::AutoDelete<Task> killer(t);
00324                                                 t->run(*this);
00325                                         } catch (const std::exception&) { };
00326                                 }
00327                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00328                                 // we are aborted, abort all blocking tasks
00329                         }
00330                 }
00331 
00332                 void SQLiteBundleStorage::componentUp()
00333                 {
00334                         //register Events
00335                         bindEvent(TimeEvent::className);
00336                         bindEvent(GlobalEvent::className);
00337 
00338                         // open the database and create all folders and files if needed
00339                         openDatabase(dbFile);
00340 
00341                         //Check if the database is consistent with the filesystem
00342                         check_consistency();
00343 
00344                         // calculate next Bundleexpiredtime
00345                         update_expire_time();
00346                 };
00347 
00348                 void SQLiteBundleStorage::componentDown()
00349                 {
00350                         //unregister Events
00351                         unbindEvent(TimeEvent::className);
00352                         unbindEvent(GlobalEvent::className);
00353                 };
00354 
00355                 bool SQLiteBundleStorage::__cancellation()
00356                 {
00357                         _tasks.abort();
00358                         return true;
00359                 }
00360 
00361                 void SQLiteBundleStorage::check_consistency()
00362                 {
00363                         /*
00364                          * check if the database is consistent with the files on the HDD. Therefore every file of the database is put in a list.
00365                          * When the files of the database are scanned it is checked if the actual file is in the list of files of the filesystem. if it is so the entry
00366                          * of the list of files of the filesystem is deleted. if not the file is put in a seperate list. After this procedure there are 2 lists containing file
00367                          * which are inconsistent and can be deleted.
00368                          */
00369                         set<string> blockFiles,payloadfiles;
00370                         string datei;
00371 
00372                         list<ibrcommon::File> blist;
00373                         list<ibrcommon::File>::iterator file_it;
00374 
00375                         //1. Durchsuche die Dateien
00376                         _blockPath.getFiles(blist);
00377                         for(file_it = blist.begin(); file_it != blist.end(); file_it++)
00378                         {
00379                                 datei = (*file_it).getPath();
00380                                 if(datei[datei.size()-1] != '.')
00381                                 {
00382                                         if(((*file_it).getBasename())[0] == 'b')
00383                                         {
00384                                                 blockFiles.insert(datei);
00385                                         }
00386                                         else
00387                                                 payloadfiles.insert(datei);
00388                                 }
00389                         }
00390 
00391                         //3. Check consistency of the bundles
00392                         check_bundles(blockFiles);
00393 
00394                         //4. Check consistency of the fragments
00395                         check_fragments(payloadfiles);
00396                 }
00397 
00398                 void SQLiteBundleStorage::check_fragments(std::set<std::string> &payloadFiles)
00399                 {
00400                         int err;
00401                         size_t timestamp, sequencenumber, fragmentoffset;
00402                         string filename, source, dest, custody, repto;
00403                         set<string> consistenFiles, inconsistenSources;
00404                         set<size_t> inconsistentTimestamp, inconsistentSeq_number;
00405                         set<string>::iterator file_it, consisten_it;
00406 
00407                         sqlite3_stmt *getPayloadfiles = prepare("SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength, hopcount FROM "+_tables[SQL_TABLE_BUNDLE]+" WHERE fragmentoffset != NULL;");
00408 
00409                         for(err = sqlite3_step(getPayloadfiles); err == SQLITE_ROW; err = sqlite3_step(getPayloadfiles))
00410                         {
00411                                 filename = (const char*)sqlite3_column_text(getPayloadfiles,10);
00412                                 file_it = payloadFiles.find(filename);
00413 
00414 
00415                                 if (file_it == payloadFiles.end())
00416                                 {
00417                                         consisten_it = consistenFiles.find(*file_it);
00418 
00419                                         //inconsistent DB entry
00420                                         if(consisten_it == consistenFiles.end())
00421                                         {
00422                                                 //Generate Report
00423                                                 source  = (const char*) sqlite3_column_text(getPayloadfiles, 0);
00424                                                 dest    = (const char*) sqlite3_column_text(getPayloadfiles, 1);
00425                                                 repto   = (const char*) sqlite3_column_text(getPayloadfiles, 2);
00426                                                 custody = (const char*) sqlite3_column_text(getPayloadfiles, 3);
00427 
00428                                                 timestamp = sqlite3_column_int64(getPayloadfiles, 5);
00429                                                 sequencenumber = sqlite3_column_int64(getPayloadfiles, 6);
00430                                                 fragmentoffset = sqlite3_column_int64(getPayloadfiles, 8);
00431 
00432                                                 dtn::data::BundleID id( dtn::data::EID(source), timestamp, sequencenumber, true, fragmentoffset );
00433                                                 dtn::data::MetaBundle mb(id);
00434 
00435                                                 mb.procflags = sqlite3_column_int64(getPayloadfiles, 4);
00436                                                 mb.lifetime = sqlite3_column_int64(getPayloadfiles, 7);
00437                                                 mb.destination = dest;
00438                                                 mb.reportto = repto;
00439                                                 mb.custodian = custody;
00440                                                 mb.appdatalength = sqlite3_column_int64(getPayloadfiles, 9);
00441                                                 mb.expiretime = dtn::utils::Clock::getExpireTime(timestamp, mb.lifetime);
00442 
00443                                                 if (sqlite3_column_type(getPayloadfiles, 10) != SQLITE_NULL)
00444                                                 {
00445                                                         mb.hopcount = sqlite3_column_int64(getPayloadfiles, 10);
00446                                                 }
00447 
00448                                                 dtn::core::BundleEvent::raise(mb, BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE);
00449                                         }
00450                                 }
00451                                 //consistent DB entry
00452                                 else
00453                                         consistenFiles.insert(*file_it);
00454                                         payloadFiles.erase(file_it);
00455                         }
00456 
00457                         sqlite3_reset(getPayloadfiles);
00458                         sqlite3_finalize(getPayloadfiles);
00459                 }
00460 
00461                 void SQLiteBundleStorage::check_bundles(std::set<std::string> &blockFiles)
00462                 {
00463                         std::set<dtn::data::BundleID> corrupt_bundle_ids;
00464 
00465                         sqlite3_stmt *blockConistencyCheck = prepare("SELECT source_id, timestamp, sequencenumber, fragmentoffset, filename, ordernumber FROM "+ _tables[SQL_TABLE_BLOCK] +";");
00466 
00467                         while (sqlite3_step(blockConistencyCheck) == SQLITE_ROW)
00468                         {
00469                                 dtn::data::BundleID id;
00470 
00471                                 // get the bundleid
00472                                 get_bundleid(blockConistencyCheck, id);
00473 
00474                                 // get the filename
00475                                 std::string filename = (const char*)sqlite3_column_text(blockConistencyCheck, 4);
00476 
00477                                 // if the filename is not in the list of block files
00478                                 if (blockFiles.find(filename) == blockFiles.end())
00479                                 {
00480                                         // add the bundle to the deletion list
00481                                         corrupt_bundle_ids.insert(id);
00482                                 }
00483                                 else
00484                                 {
00485                                         // file is available, everything is fine
00486                                         blockFiles.erase(filename);
00487                                 }
00488                         }
00489                         sqlite3_reset(blockConistencyCheck);
00490                         sqlite3_finalize(blockConistencyCheck);
00491 
00492                         for(std::set<dtn::data::BundleID>::const_iterator iter = corrupt_bundle_ids.begin(); iter != corrupt_bundle_ids.end(); iter++)
00493                         {
00494                                 try {
00495                                         const dtn::data::BundleID &id = (*iter);
00496 
00497                                         // get meta data of this bundle
00498                                         const dtn::data::MetaBundle m = get_meta(id);
00499 
00500                                         // remove the hole bundle
00501                                         remove(id);
00502                                 } catch (const dtn::core::SQLiteBundleStorage::SQLiteQueryException&) { };
00503                         }
00504 
00505                         // delete block files still in the blockfile list
00506                         for (std::set<std::string>::const_iterator iter = blockFiles.begin(); iter != blockFiles.end(); iter++)
00507                         {
00508                                 ibrcommon::File blockfile(*iter);
00509                                 blockfile.remove();
00510                         }
00511                 }
00512 
00513 
00514                 sqlite3_stmt* SQLiteBundleStorage::prepare(const std::string &sqlquery)
00515                 {
00516                         // prepare the statement
00517                         sqlite3_stmt *statement;
00518                         int err = sqlite3_prepare_v2(_database, sqlquery.c_str(), sqlquery.length(), &statement, 0);
00519                         if ( err != SQLITE_OK )
00520                         {
00521                                 IBRCOMMON_LOGGER(error) << "SQLiteBundlestorage: failure in prepareStatement: " << err << " with Query: " << sqlquery << IBRCOMMON_LOGGER_ENDL;
00522                         }
00523                         return statement;
00524                 }
00525 
00526                 dtn::data::MetaBundle SQLiteBundleStorage::get_meta(const dtn::data::BundleID &id)
00527                 {
00528                         dtn::data::MetaBundle bundle;
00529 
00530                         // check if the bundle is already on the deletion list
00531                         if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException();
00532 
00533                         size_t stmt_key = BUNDLE_GET_ID;
00534                         if (id.fragment) stmt_key = FRAGMENT_GET_ID;
00535 
00536                         // lock the prepared statement
00537                         AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
00538 
00539                         // bind bundle id to the statement
00540                         set_bundleid(_statements[stmt_key], id);
00541 
00542                         // execute the query and check for error
00543                         if (sqlite3_step(_statements[stmt_key]) != SQLITE_ROW)
00544                         {
00545                                 stringstream error;
00546                                 error << "SQLiteBundleStorage: No Bundle found with BundleID: " << id.toString();
00547                                 IBRCOMMON_LOGGER_DEBUG(15) << error.str() << IBRCOMMON_LOGGER_ENDL;
00548                                 throw SQLiteQueryException(error.str());
00549                         }
00550 
00551                         // query bundle data
00552                         get(_statements[stmt_key], bundle);
00553 
00554                         return bundle;
00555                 }
00556 
00557                 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::MetaBundle &bundle, size_t offset)
00558                 {
00559                         bundle.source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) );
00560                         bundle.destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) );
00561                         bundle.reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) );
00562                         bundle.custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) );
00563                         bundle.procflags = sqlite3_column_int(st, offset + 4);
00564                         bundle.timestamp = sqlite3_column_int64(st, offset + 5);
00565                         bundle.sequencenumber = sqlite3_column_int64(st, offset + 6);
00566                         bundle.lifetime = sqlite3_column_int64(st, offset + 7);
00567                         bundle.expiretime = dtn::utils::Clock::getExpireTime(bundle.timestamp, bundle.lifetime);
00568 
00569                         if (bundle.procflags & data::Bundle::FRAGMENT)
00570                         {
00571                                 bundle.offset = sqlite3_column_int64(st, offset + 8);
00572                                 bundle.appdatalength = sqlite3_column_int64(st, offset + 9);
00573                         }
00574 
00575                         if (sqlite3_column_type(st, offset + 10) != SQLITE_NULL)
00576                         {
00577                                 bundle.hopcount = sqlite3_column_int64(st, 10);
00578                         }
00579                 }
00580 
00581                 void SQLiteBundleStorage::get(sqlite3_stmt *st, dtn::data::Bundle &bundle, size_t offset)
00582                 {
00583                         bundle._source = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 0) );
00584                         bundle._destination = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 1) );
00585                         bundle._reportto = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 2) );
00586                         bundle._custodian = dtn::data::EID( (const char*) sqlite3_column_text(st, offset + 3) );
00587                         bundle._procflags = sqlite3_column_int(st, offset + 4);
00588                         bundle._timestamp = sqlite3_column_int64(st, offset + 5);
00589                         bundle._sequencenumber = sqlite3_column_int64(st, offset + 6);
00590                         bundle._lifetime = sqlite3_column_int64(st, offset + 7);
00591 
00592                         if (bundle._procflags & data::Bundle::FRAGMENT)
00593                         {
00594                                 bundle._fragmentoffset = sqlite3_column_int64(st, offset + 8);
00595                                 bundle._appdatalength = sqlite3_column_int64(st, offset + 9);
00596                         }
00597                 }
00598 
00599                 const std::list<dtn::data::MetaBundle> SQLiteBundleStorage::get(BundleFilterCallback &cb)
00600                 {
00601                         std::list<dtn::data::MetaBundle> ret;
00602 
00603                         const std::string base_query =
00604                                         "SELECT source, destination, reportto, custodian, procflags, timestamp, sequencenumber, lifetime, fragmentoffset, appdatalength, hopcount FROM " + _tables[SQL_TABLE_BUNDLE];
00605 
00606                         sqlite3_stmt *st = NULL;
00607                         size_t bind_offset = 1;
00608 
00609                         try {
00610                                 SQLBundleQuery &query = dynamic_cast<SQLBundleQuery&>(cb);
00611 
00612                                 if (query._statement == NULL)
00613                                 {
00614                                         std::string sql = base_query + " WHERE " + query.getWhere() + " ORDER BY priority DESC";
00615 
00616                                         if (cb.limit() > 0)
00617                                         {
00618                                                 sql += " LIMIT ?,?";
00619                                         }
00620 
00621                                         // add closing delimiter
00622                                         sql += ";";
00623 
00624                                         // prepare the hole statement
00625                                         query._statement = prepare(sql);
00626                                 }
00627 
00628                                 st = query._statement;
00629                                 bind_offset = query.bind(st, 1);
00630                         } catch (const std::bad_cast&) {
00631                                 // this query is not optimized for sql, use the generic way (slow)
00632                                 st = _statements[BUNDLE_GET_FILTER];
00633                         };
00634 
00635                         size_t offset = 0;
00636                         while (true)
00637                         {
00638                                 // lock the database
00639                                 AutoResetLock l(_locks[BUNDLE_GET_FILTER], st);
00640 
00641                                 if (cb.limit() > 0)
00642                                 {
00643                                         sqlite3_bind_int64(st, bind_offset, offset);
00644                                         sqlite3_bind_int64(st, bind_offset + 1, cb.limit());
00645                                 }
00646 
00647                                 if (sqlite3_step(st) == SQLITE_DONE)
00648                                 {
00649                                         // returned no result
00650                                         break;
00651                                 }
00652 
00653                                 while (true)
00654                                 {
00655                                         dtn::data::MetaBundle m;
00656 
00657                                         // extract the primary values and set them in the bundle object
00658                                         get(st, m, 0);
00659 
00660                                         // check if the bundle is already on the deletion list
00661                                         if (!contains_deletion(m))
00662                                         {
00663                                                 // ask the filter if this bundle should be added to the return list
00664                                                 if (cb.shouldAdd(m))
00665                                                 {
00666                                                         IBRCOMMON_LOGGER_DEBUG(40) << "add bundle to query selection list: " << m.toString() << IBRCOMMON_LOGGER_ENDL;
00667 
00668                                                         // add the bundle to the list
00669                                                         ret.push_back(m);
00670                                                 }
00671                                         }
00672 
00673                                         if (sqlite3_step(st) != SQLITE_ROW)
00674                                         {
00675                                                 break;
00676                                         }
00677 
00678                                         // abort if enough bundles are found
00679                                         if ((cb.limit() > 0) && (ret.size() >= cb.limit())) break;
00680                                 }
00681 
00682                                 // if a limit is set
00683                                 if (cb.limit() > 0)
00684                                 {
00685                                         // increment the offset, because we might not have enough
00686                                         offset += cb.limit();
00687 
00688                                         // abort if enough bundles are found
00689                                         if (ret.size() >= cb.limit()) break;
00690                                 }
00691                                 else
00692                                 {
00693                                         break;
00694                                 }
00695                         }
00696 
00697                         return ret;
00698                 }
00699 
00700                 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id)
00701                 {
00702                         dtn::data::Bundle bundle;
00703                         int err = 0;
00704 
00705                         IBRCOMMON_LOGGER_DEBUG(25) << "get bundle from sqlite storage " << id.toString() << IBRCOMMON_LOGGER_ENDL;
00706 
00707                         // if bundle is deleted?
00708                         if (contains_deletion(id)) throw dtn::core::BundleStorage::NoBundleFoundException();
00709 
00710                         size_t stmt_key = BUNDLE_GET_ID;
00711                         if (id.fragment) stmt_key = FRAGMENT_GET_ID;
00712 
00713                         // do this while db is locked
00714                         AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
00715 
00716                         // set the bundle key values
00717                         set_bundleid(_statements[stmt_key], id);
00718 
00719                         // execute the query and check for error
00720                         if ((err = sqlite3_step(_statements[stmt_key])) != SQLITE_ROW)
00721                         {
00722                                 IBRCOMMON_LOGGER_DEBUG(15) << "sql error: " << err << "; No bundle found with id: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
00723                                 throw dtn::core::BundleStorage::NoBundleFoundException();
00724                         }
00725 
00726                         // read bundle data
00727                         get(_statements[stmt_key], bundle);
00728 
00729                         try {
00730                                 // read all blocks
00731                                 get_blocks(bundle, id);
00732                         } catch (const ibrcommon::Exception &ex) {
00733                                 IBRCOMMON_LOGGER(error) << "could not get bundle blocks: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00734                                 throw dtn::core::BundleStorage::NoBundleFoundException();
00735                         }
00736 
00737                         return bundle;
00738                 }
00739 
00740 #ifdef SQLITE_STORAGE_EXTENDED
00741                 void SQLiteBundleStorage::setPriority(const int priority, const dtn::data::BundleID &id)
00742                 {
00743                         int err;
00744                         size_t procflags;
00745                         {
00746                                 sqlite3_bind_text(_statements[PROCFLAGS_GET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT);
00747                                 err = sqlite3_step(_statements[PROCFLAGS_GET]);
00748                                 if(err == SQLITE_ROW)
00749                                         procflags = sqlite3_column_int(_statements[PROCFLAGS_GET],0);
00750                                 sqlite3_reset(_statements[PROCFLAGS_GET]);
00751                                 if(err != SQLITE_DONE){
00752                                         stringstream error;
00753                                         error << "SQLiteBundleStorage: error while Select Querry: " << err;
00754                                         IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00755                                         throw SQLiteQueryException(error.str());
00756                                 }
00757 
00758                                 //Priority auf 0 Setzen
00759                                 procflags -= ( 128*(procflags & dtn::data::Bundle::PRIORITY_BIT1) + 256*(procflags & dtn::data::Bundle::PRIORITY_BIT2));
00760 
00761                                 //Set Prioritybits
00762                                 switch(priority){
00763                                 case 1: procflags += data::Bundle::PRIORITY_BIT1; break; //Set PRIORITY_BIT1
00764                                 case 2: procflags += data::Bundle::PRIORITY_BIT2; break;
00765                                 case 3: procflags += (data::Bundle::PRIORITY_BIT1 + data::Bundle::PRIORITY_BIT2); break;
00766                                 }
00767 
00768                                 sqlite3_bind_text(_statements[PROCFLAGS_SET],1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT);
00769                                 sqlite3_bind_int64(_statements[PROCFLAGS_SET],2,priority);
00770                                 sqlite3_step(_statements[PROCFLAGS_SET]);
00771                                 sqlite3_reset(_statements[PROCFLAGS_SET]);
00772 
00773                                 if(err != SQLITE_DONE)
00774                                 {
00775                                         stringstream error;
00776                                         error << "SQLiteBundleStorage: setPriority error while Select Querry: " << err;
00777                                         IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00778                                         throw SQLiteQueryException(error.str());
00779                                 }
00780                         }
00781                 }
00782 #endif
00783 
00784                 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle)
00785                 {
00786                         IBRCOMMON_LOGGER_DEBUG(25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00787 
00788 //                      // if the bundle is a fragment store it as one
00789 //                      if (bundle.get(dtn::data::Bundle::FRAGMENT) && local)
00790 //                      {
00791 //                              storeFragment(bundle);
00792 //                              return;
00793 //                      }
00794 
00795                         int err;
00796 
00797                         const dtn::data::EID _sourceid = bundle._source;
00798                         size_t TTL = bundle._timestamp + bundle._lifetime;
00799 
00800                         AutoResetLock l(_locks[BUNDLE_STORE], _statements[BUNDLE_STORE]);
00801 
00802                         set_bundleid(_statements[BUNDLE_STORE], bundle);
00803 
00804                         sqlite3_bind_text(_statements[BUNDLE_STORE], 5, bundle._source.getString().c_str(), bundle._source.getString().length(), SQLITE_TRANSIENT);
00805                         sqlite3_bind_text(_statements[BUNDLE_STORE], 6, bundle._destination.getString().c_str(), bundle._destination.getString().length(), SQLITE_TRANSIENT);
00806                         sqlite3_bind_text(_statements[BUNDLE_STORE], 7, bundle._reportto.getString().c_str(), bundle._reportto.getString().length(), SQLITE_TRANSIENT);
00807                         sqlite3_bind_text(_statements[BUNDLE_STORE], 8, bundle._custodian.getString().c_str(), bundle._custodian.getString().length(), SQLITE_TRANSIENT);
00808                         sqlite3_bind_int(_statements[BUNDLE_STORE], 9, bundle._procflags);
00809                         sqlite3_bind_int64(_statements[BUNDLE_STORE], 10, bundle._lifetime);
00810 
00811                         if (bundle.get(dtn::data::Bundle::FRAGMENT))
00812                         {
00813                                 sqlite3_bind_int64(_statements[BUNDLE_STORE], 11, bundle._appdatalength);
00814                         }
00815                         else
00816                         {
00817                                 sqlite3_bind_null(_statements[BUNDLE_STORE], 4);
00818                                 sqlite3_bind_null(_statements[BUNDLE_STORE], 11);
00819                         }
00820 
00821                         sqlite3_bind_int64(_statements[BUNDLE_STORE], 12, TTL);
00822                         sqlite3_bind_int64(_statements[BUNDLE_STORE], 13, dtn::data::MetaBundle(bundle).getPriority());
00823 
00824                         try {
00825                                 const dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<const dtn::data::ScopeControlHopLimitBlock>();
00826                                 sqlite3_bind_int64(_statements[BUNDLE_STORE], 14, schl.getHopsToLive() );
00827                         } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) {
00828                                 sqlite3_bind_null(_statements[BUNDLE_STORE], 14 );
00829                         };
00830 
00831                         // start a transaction
00832                         sqlite3_exec(_database, "BEGIN TRANSACTION;", NULL, NULL, NULL);
00833 
00834                         // store the bundle data in the database
00835                         err = sqlite3_step(_statements[BUNDLE_STORE]);
00836 
00837                         if (err == SQLITE_CONSTRAINT)
00838                         {
00839                                 IBRCOMMON_LOGGER(warning) << "Bundle is already in the storage" << IBRCOMMON_LOGGER_ENDL;
00840                         }
00841                         else if (err != SQLITE_DONE)
00842                         {
00843                                 stringstream error;
00844                                 error << "SQLiteBundleStorage: store() failure: " << err << " " <<  sqlite3_errmsg(_database);
00845                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00846 
00847                                 // rollback the whole transaction
00848                                 sqlite3_exec(_database, "ROLLBACK TRANSACTION;", NULL, NULL, NULL);
00849 
00850                                 throw SQLiteQueryException(error.str());
00851                         }
00852                         else
00853                         {
00854                                 // stores the blocks to a file
00855                                 store_blocks(bundle);
00856 
00857                                 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL;
00858                         }
00859 
00860                         // commit the transaction
00861                         sqlite3_exec(_database, "END TRANSACTION;", NULL, NULL, NULL);
00862 
00863                         // set new expire time
00864                         new_expire_time(TTL);
00865 
00866                         try {
00867                                 // the bundle is stored sucessfully, we could accept custody if it is requested
00868                                 const dtn::data::EID custodian = acceptCustody(bundle);
00869 
00870                                 // update the custody address of this bundle
00871                                 update_custodian(bundle, custodian);
00872                         } catch (const ibrcommon::Exception&) {
00873                                 // this bundle has no request for custody transfers
00874                         }
00875                 }
00876 
00877                 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id)
00878                 {
00879                         add_deletion(id);
00880                         _tasks.push(new TaskRemove(id));
00881                 }
00882 
00883                 void SQLiteBundleStorage::TaskRemove::run(SQLiteBundleStorage &storage)
00884                 {
00885                         {
00886                                 // select the right statement to use
00887                                 const size_t stmt_key = _id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID;
00888 
00889                                 // lock the database
00890                                 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]);
00891 
00892                                 // set the bundle key values
00893                                 storage.set_bundleid(storage._statements[stmt_key], _id);
00894 
00895                                 // step through all blocks
00896                                 while (sqlite3_step(storage._statements[stmt_key]) == SQLITE_ROW)
00897                                 {
00898                                         // delete each referenced block file
00899                                         ibrcommon::File blockfile( (const char*)sqlite3_column_text(storage._statements[stmt_key], 0) );
00900                                         blockfile.remove();
00901                                 }
00902                         }
00903 
00904                         {
00905                                 const size_t stmt_key = _id.fragment ? FRAGMENT_DELETE : BUNDLE_DELETE;
00906 
00907                                 // lock the database
00908                                 AutoResetLock l(storage._locks[stmt_key], storage._statements[stmt_key]);
00909 
00910                                 // then remove the bundle data
00911                                 storage.set_bundleid(storage._statements[stmt_key], _id);
00912                                 sqlite3_step(storage._statements[stmt_key]);
00913 
00914                                 IBRCOMMON_LOGGER_DEBUG(10) << "bundle " << _id.toString() << " deleted" << IBRCOMMON_LOGGER_ENDL;
00915                         }
00916 
00917                         //update deprecated timer
00918                         storage.update_expire_time();
00919 
00920                         // remove it from the deletion list
00921                         storage.remove_deletion(_id);
00922                 }
00923 
00924 #ifdef SQLITE_STORAGE_EXTENDED
00925                 std::string SQLiteBundleStorage::getBundleRoutingInfo(const data::BundleID &bundleID, const int &key)
00926                 {
00927                         int err;
00928                         string result;
00929 
00930                         sqlite3_bind_text(_statements[ROUTING_GET],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00931                         sqlite3_bind_int(_statements[ROUTING_GET],2,key);
00932                         err = sqlite3_step(_statements[ROUTING_GET]);
00933                         if(err == SQLITE_ROW){
00934                                 result = (const char*) sqlite3_column_text(_statements[ROUTING_GET],0);
00935                         }
00936                         sqlite3_reset(_statements[ROUTING_GET]);
00937                         if(err != SQLITE_DONE && err != SQLITE_ROW){
00938                                 stringstream error;
00939                                 error << "SQLiteBundleStorage: getBundleRoutingInfo: " << err << " errmsg: " << err;
00940                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00941                                 throw SQLiteQueryException(error.str());
00942                         }
00943 
00944                         return result;
00945                 }
00946 
00947                 std::string SQLiteBundleStorage::getNodeRoutingInfo(const data::EID &eid, const int &key)
00948                 {
00949                         int err;
00950                         string result;
00951 
00952                         err = sqlite3_bind_text(_statements[NODE_GET],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT);
00953                         err = sqlite3_bind_int(_statements[NODE_GET],2,key);
00954                         err = sqlite3_step(_statements[NODE_GET]);
00955                         if(err == SQLITE_ROW){
00956                                 result = (const char*) sqlite3_column_text(_statements[NODE_GET],0);
00957                         }
00958                         sqlite3_reset(_statements[NODE_GET]);
00959                         if(err != SQLITE_DONE && err != SQLITE_ROW){
00960                                 stringstream error;
00961                                 error << "SQLiteBundleStorage: getNodeRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database);
00962                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00963                                 throw SQLiteQueryException(error.str());
00964                         }
00965 
00966                         return result;
00967                 }
00968 
00969                 std::string SQLiteBundleStorage::getRoutingInfo(const int &key)
00970                 {
00971                         int err;
00972                         string result;
00973 
00974                         sqlite3_bind_int(_statements[INFO_GET],1,key);
00975                         err = sqlite3_step(_statements[INFO_GET]);
00976                         if(err == SQLITE_ROW){
00977                                 result = (const char*) sqlite3_column_text(_statements[INFO_GET],0);
00978                         }
00979                         sqlite3_reset(_statements[INFO_GET]);
00980                         if(err != SQLITE_DONE && err != SQLITE_ROW){
00981                                 stringstream error;
00982                                 error << "SQLiteBundleStorage: getRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(_database);
00983                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
00984                                 throw SQLiteQueryException(error.str());
00985                         }
00986 
00987                         return result;
00988                 }
00989 
00990                 void SQLiteBundleStorage::storeBundleRoutingInfo(const data::BundleID &bundleID, const int &key, const string &routingInfo)
00991                 {
00992                         int err;
00993 
00994                         sqlite3_bind_text(_statements[ROUTING_STORE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00995                         sqlite3_bind_int(_statements[ROUTING_STORE],2,key);
00996                         sqlite3_bind_text(_statements[ROUTING_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00997                         err = sqlite3_step(_statements[ROUTING_STORE]);
00998                         sqlite3_reset(_statements[ROUTING_STORE]);
00999                         if(err == SQLITE_CONSTRAINT)
01000                                 cerr << "warning: BundleRoutingInformations for "<<bundleID.toString() <<" are either already in the storage or there is no Bundle with this BundleID."<<endl;
01001                         else if(err != SQLITE_DONE){
01002                                 stringstream error;
01003                                 error << "SQLiteBundleStorage: storeBundleRoutingInfo: " << err << " errmsg: " << err;
01004                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01005                                 throw SQLiteQueryException(error.str());
01006                         }
01007                 }
01008 
01009                 void SQLiteBundleStorage::storeNodeRoutingInfo(const data::EID &node, const int &key, const std::string &routingInfo)
01010                 {
01011                         int err;
01012 
01013                         sqlite3_bind_text(_statements[NODE_STORE],1,node.getString().c_str(),node.getString().length(), SQLITE_TRANSIENT);
01014                         sqlite3_bind_int(_statements[NODE_STORE],2,key);
01015                         sqlite3_bind_text(_statements[NODE_STORE],3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
01016                         err = sqlite3_step(_statements[NODE_STORE]);
01017                         sqlite3_reset(_statements[NODE_STORE]);
01018                         if(err == SQLITE_CONSTRAINT)
01019                                 cerr << "warning: NodeRoutingInfo for "<<node.getString() <<" are already in the storage."<<endl;
01020                         else if(err != SQLITE_DONE){
01021                                 stringstream error;
01022                                 error << "SQLiteBundleStorage: storeNodeRoutingInfo: " << err << " errmsg: " << err;
01023                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01024                                 throw SQLiteQueryException(error.str());
01025                         }
01026                 }
01027 
01028                 void SQLiteBundleStorage::storeRoutingInfo(const int &key, const std::string &routingInfo)
01029                 {
01030                         int err;
01031 
01032                         sqlite3_bind_int(_statements[INFO_STORE],1,key);
01033                         sqlite3_bind_text(_statements[INFO_STORE],2,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
01034                         err = sqlite3_step(_statements[INFO_STORE]);
01035                         sqlite3_reset(_statements[INFO_STORE]);
01036                         if(err == SQLITE_CONSTRAINT)
01037                                 cerr << "warning: There are already RoutingInformation refereed by this Key"<<endl;
01038                         else if(err != SQLITE_DONE){
01039                                 stringstream error;
01040                                 error << "SQLiteBundleStorage: storeRoutingInfo: " << err << " errmsg: " << err;
01041                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01042                                 throw SQLiteQueryException(error.str());
01043                         }
01044                 }
01045 
01046                 void SQLiteBundleStorage::removeBundleRoutingInfo(const data::BundleID &bundleID, const int &key)
01047                 {
01048                         int err;
01049 
01050                         sqlite3_bind_text(_statements[ROUTING_REMOVE],1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
01051                         sqlite3_bind_int(_statements[ROUTING_REMOVE],2,key);
01052                         err = sqlite3_step(_statements[ROUTING_REMOVE]);
01053                         sqlite3_reset(_statements[ROUTING_REMOVE]);
01054                         if(err != SQLITE_DONE){
01055                                 stringstream error;
01056                                 error << "SQLiteBundleStorage: removeBundleRoutingInfo: " << err << " errmsg: " << err;
01057                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01058                                 throw SQLiteQueryException(error.str());
01059                         }
01060                 }
01061 
01062                 void SQLiteBundleStorage::removeNodeRoutingInfo(const data::EID &eid, const int &key)
01063                 {
01064                         int err;
01065 
01066                         sqlite3_bind_text(_statements[NODE_REMOVE],1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT);
01067                         sqlite3_bind_int(_statements[NODE_REMOVE],2,key);
01068                         err = sqlite3_step(_statements[NODE_REMOVE]);
01069                         sqlite3_reset(_statements[NODE_REMOVE]);
01070                         if(err != SQLITE_DONE){
01071                                 stringstream error;
01072                                 error << "SQLiteBundleStorage: removeNodeRoutingInfo: " << err << " errmsg: " << err;
01073                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01074                                 throw SQLiteQueryException(error.str());
01075                         }
01076                 }
01077 
01078                 void SQLiteBundleStorage::removeRoutingInfo(const int &key)
01079                 {
01080                         int err;
01081 
01082                         sqlite3_bind_int(_statements[INFO_REMOVE],1,key);
01083                         err = sqlite3_step(_statements[INFO_REMOVE]);
01084                         sqlite3_reset(_statements[INFO_REMOVE]);
01085                         if(err != SQLITE_DONE){
01086                                 stringstream error;
01087                                 error << "SQLiteBundleStorage: removeRoutingInfo: " << err << " errmsg: " << err;
01088                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01089                                 throw SQLiteQueryException(error.str());
01090                         }
01091                 }
01092 #endif
01093 
01094                 void SQLiteBundleStorage::clearAll()
01095                 {
01096 #ifdef SQLITE_STORAGE_EXTENDED
01097                         sqlite3_step(_statements[ROUTING_CLEAR]);
01098                         sqlite3_reset(_statements[ROUTING_CLEAR]);
01099                         sqlite3_step(_statements[NODE_CLEAR]);
01100                         sqlite3_reset(_statements[NODE_CLEAR]);
01101 #endif
01102 
01103                         clear();
01104                 }
01105 
01106                 void SQLiteBundleStorage::clear()
01107                 {
01108                         AutoResetLock l(_locks[VACUUM], _statements[VACUUM]);
01109 
01110                         sqlite3_step(_statements[BUNDLE_CLEAR]);
01111                         sqlite3_reset(_statements[BUNDLE_CLEAR]);
01112                         sqlite3_step(_statements[BLOCK_CLEAR]);
01113                         sqlite3_reset(_statements[BLOCK_CLEAR]);
01114 
01115                         if(SQLITE_DONE != sqlite3_step(_statements[VACUUM]))
01116                         {
01117                                 sqlite3_reset(_statements[VACUUM]);
01118                                 throw SQLiteQueryException("SQLiteBundleStore: clear(): vacuum failed.");
01119                         }
01120 
01121                         //Delete Folder SQL_TABLE_BLOCK containing Blocks
01122                         {
01123                                 _blockPath.remove(true);
01124                                 ibrcommon::File::createDirectory(_blockPath);
01125                         }
01126 
01127                         reset_expire_time();
01128                 }
01129 
01130 
01131 
01132                 bool SQLiteBundleStorage::empty()
01133                 {
01134                         AutoResetLock l(_locks[EMPTY_CHECK], _statements[EMPTY_CHECK]);
01135 
01136                         if (SQLITE_DONE == sqlite3_step(_statements[EMPTY_CHECK]))
01137                         {
01138                                 return true;
01139                         }
01140                         else
01141                         {
01142                                 return false;
01143                         }
01144                 }
01145 
01146                 unsigned int SQLiteBundleStorage::count()
01147                 {
01148                         int rows = 0;
01149                         int err = 0;
01150 
01151                         AutoResetLock l(_locks[COUNT_ENTRIES], _statements[COUNT_ENTRIES]);
01152 
01153                         if ((err = sqlite3_step(_statements[COUNT_ENTRIES])) == SQLITE_ROW)
01154                         {
01155                                 rows = sqlite3_column_int(_statements[COUNT_ENTRIES], 0);
01156                         }
01157                         else
01158                         {
01159                                 stringstream error;
01160                                 error << "SQLiteBundleStorage: count: failure " << err << " " << sqlite3_errmsg(_database);
01161                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01162                                 throw SQLiteQueryException(error.str());
01163                         }
01164 
01165                         return rows;
01166                 }
01167 
01168 #ifdef SQLITE_STORAGE_EXTENDED
01169                 int SQLiteBundleStorage::occupiedSpace()
01170                 {
01171                         int size = 0;
01172                         std::list<ibrcommon::File> files;
01173 
01174                         if (_blockPath.getFiles(files))
01175                         {
01176                                 IBRCOMMON_LOGGER(error) << "occupiedSpace: unable to open Directory " << _blockPath.getPath() << IBRCOMMON_LOGGER_ENDL;
01177                                 return -1;
01178                         }
01179 
01180                         for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++)
01181                         {
01182                                 const ibrcommon::File &f = (*iter);
01183 
01184                                 if (!f.isSystem())
01185                                 {
01186                                         size += f.size();
01187                                 }
01188                         }
01189 
01190                         // add db file size
01191                         size += dbFile.size();
01192 
01193                         return size;
01194                 }
01195 
01196                 void SQLiteBundleStorage::storeFragment(const dtn::data::Bundle &bundle)
01197                 {
01198                         int err = 0;
01199                         size_t bitcounter = 0;
01200                         bool allFragmentsReceived(true);
01201                         size_t payloadsize, TTL, fragmentoffset;
01202 
01203                         ibrcommon::File payloadfile, filename;
01204 
01205                         // generate a bundle id string
01206                         std::string bundleID = dtn::data::BundleID(bundle).toString();
01207 
01208                         // get the destination id string
01209                         std::string destination = bundle._destination.getString();
01210 
01211                         // get the source id string
01212                         std::string sourceEID = bundle._source.getString();
01213 
01214                         // calculate the expiration timestamp
01215                         TTL = bundle._timestamp + bundle._lifetime;
01216 
01217                         const dtn::data::PayloadBlock &block = bundle.getBlock<dtn::data::PayloadBlock>();
01218                         ibrcommon::BLOB::Reference payloadBlob = block.getBLOB();
01219 
01220                         // get the payload size
01221                         {
01222                                 ibrcommon::BLOB::iostream io = payloadBlob.iostream();
01223                                 payloadsize = io.size();
01224                         }
01225 
01226                         //Saves the blocks from the first and the last Fragment, they may contain Extentionblock
01227                         bool last = bundle._fragmentoffset + payloadsize == bundle._appdatalength;
01228                         bool first = bundle._fragmentoffset == 0;
01229 
01230                         if(first || last)
01231                         {
01232                                 int i = 0, blocknumber = 0;
01233 
01234                                 // get the list of blocks
01235                                 std::list<const dtn::data::Block*> blocklist = bundle.getBlocks();
01236 
01237                                 // iterate through all blocks
01238                                 for (std::list<const dtn::data::Block* >::const_iterator it = blocklist.begin(); it != blocklist.end(); it++, i++)
01239                                 {
01240                                         // generate a temporary block file
01241                                         ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01242 
01243                                         ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary);
01244 
01245                                         dtn::data::SeparateSerializer serializer(datei);
01246                                         serializer << (*(*it));
01247                                         datei.close();
01248 
01249                                         /* Schreibt Blöcke in die DB.
01250                                          * Die Blocknummern werden nach folgendem Schema in die DB geschrieben:
01251                                          *              Die Blöcke des ersten Fragments fangen bei -x an und gehen bis -1
01252                                          *              Die Blöcke des letzten Fragments beginnen bei 1 und gehen bis x
01253                                          *              Der Payloadblock wird an Stelle 0 eingefügt.
01254                                          */
01255                                         sqlite3_bind_text(_statements[BLOCK_STORE], 1, bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT);
01256                                         sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType()));
01257                                         sqlite3_bind_text(_statements[BLOCK_STORE], 3, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT);
01258 
01259                                         blocknumber = blocklist.size() - i;
01260 
01261                                         if (first)
01262                                         {
01263                                                 blocknumber = (blocklist.size() - i)*(-1);
01264                                         }
01265 
01266                                         sqlite3_bind_int(_statements[BLOCK_STORE],4,blocknumber);
01267                                         executeQuery(_statements[BLOCK_STORE]);
01268                                 }
01269                         }
01270 
01271                         //At first the database is checked for already received bundles and some
01272                         //informations are read, which effect the following program flow.
01273                         sqlite3_bind_text(_statements[BUNDLE_GET_FRAGMENT],1,sourceEID.c_str(),sourceEID.length(),SQLITE_TRANSIENT);
01274                         sqlite3_bind_int64(_statements[BUNDLE_GET_FRAGMENT],2,bundle._timestamp);
01275                         sqlite3_bind_int(_statements[BUNDLE_GET_FRAGMENT],3,bundle._sequencenumber);
01276                         err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]);
01277 
01278                         //Es ist noch kein Eintrag vorhanden: generiere Payloadfilename
01279                         if(err == SQLITE_DONE)
01280                         {
01281                                 // generate a temporary block file
01282                                 ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01283                                 payloadfile = tmpfile;
01284                         }
01285                         //Es ist bereits ein eintrag vorhanden: speicher den payloadfilename
01286                         else if (err == SQLITE_ROW)
01287                         {
01288                                 payloadfile = ibrcommon::File( (const char*)sqlite3_column_text(_statements[BUNDLE_GET_FRAGMENT], 14) );                                //14 = Payloadfilename
01289                         }
01290 
01291                         while (err == SQLITE_ROW){
01292                                 //The following codepice summs the payloadfragmentbytes, to check if all fragments were received
01293                                 //and the bundle is complete.
01294                                 fragmentoffset = sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],9);                                                                // 9 = fragmentoffset
01295 
01296                                 if(bitcounter >= fragmentoffset){
01297                                         bitcounter += fragmentoffset - bitcounter + sqlite3_column_int(_statements[BUNDLE_GET_FRAGMENT],13);    //13 = payloadlength
01298                                 }
01299                                 else if(bitcounter >= bundle._fragmentoffset){
01300                                         bitcounter += bundle._fragmentoffset - bitcounter + payloadsize;
01301                                 }
01302                                 else{
01303                                         allFragmentsReceived = false;
01304                                 }
01305                                 err = sqlite3_step(_statements[BUNDLE_GET_FRAGMENT]);
01306                         }
01307 
01308                         if(bitcounter +1 >= bundle._fragmentoffset && allFragmentsReceived) //Wenn das aktuelle fragment das größte ist, ist die Liste durchlaufen und das Frag nicht dazugezählt. Dies wird hier nachgeholt.
01309                         {
01310                                 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize;
01311                         }
01312 
01313                         sqlite3_reset(_statements[BUNDLE_GET_FRAGMENT]);
01314                         if(err != SQLITE_DONE){
01315                                 stringstream error;
01316                                 error << "SQLiteBundleStorage: storeFragment: " << err << " errmsg: " << sqlite3_errmsg(_database);
01317                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01318                                 throw SQLiteQueryException(error.str());
01319                         }
01320 
01321                         //Save the Payload in a separate file
01322                         std::fstream datei(payloadfile.getPath().c_str(), std::ios::in | std::ios::out | std::ios::binary);
01323                         datei.seekp(bundle._fragmentoffset);
01324                         {
01325                                 ibrcommon::BLOB::iostream io = payloadBlob.iostream();
01326 
01327                                 size_t ret = 0;
01328                                 istream &s = (*io);
01329                                 s.seekg(0);
01330 
01331                                 while (true)
01332                                 {
01333                                         char buf;
01334                                         s.get(buf);
01335                                         if(!s.eof()){
01336                                                 datei.put(buf);
01337                                                 ret++;
01338                                         }
01339                                         else
01340                                                 break;
01341                                 }
01342                                 datei.close();
01343                         }
01344 
01345                         //All fragments received
01346                         if(bundle._appdatalength == bitcounter)
01347                         {
01348                                 /*
01349                                  * 1. Payloadblock zusammensetzen
01350                                  * 2. PayloadBlock in der BlockTable sichern
01351                                  * 3. Primary BlockInfos in die BundleTable eintragen.
01352                                  * 4. Einträge der Fragmenttabelle löschen
01353                                  */
01354 
01355                                 // 1. Get the payloadblockobject
01356                                 {
01357                                         ibrcommon::BLOB::Reference ref(ibrcommon::FileBLOB::create(payloadfile));
01358                                         dtn::data::PayloadBlock pb(ref);
01359 
01360                                         //Copy EID list
01361                                         if(pb.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS))
01362                                         {
01363                                                 std::list<dtn::data::EID> eidlist;
01364                                                 std::list<dtn::data::EID>::iterator eidIt;
01365                                                 eidlist = block.getEIDList();
01366 
01367                                                 for(eidIt = eidlist.begin(); eidIt != eidlist.end(); eidIt++)
01368                                                 {
01369                                                         pb.addEID(*eidIt);
01370                                                 }
01371                                         }
01372 
01373                                         //Copy ProcFlags
01374                                         pb.set(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT, block.get(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT));
01375                                         pb.set(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED));
01376                                         pb.set(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED));
01377                                         pb.set(dtn::data::PayloadBlock::LAST_BLOCK, block.get(dtn::data::PayloadBlock::LAST_BLOCK));
01378                                         pb.set(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED));
01379                                         pb.set(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED, block.get(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED));
01380                                         pb.set(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS, block.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS));
01381 
01382                                         //2. Save the PayloadBlock to a file and store the metainformation in the BlockTable
01383                                         ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01384 
01385                                         std::ofstream datei(tmpfile.getPath().c_str(), std::ios::out | std::ios::binary);
01386                                         dtn::data::SeparateSerializer serializer(datei);
01387                                         serializer << (pb);
01388                                         datei.close();
01389 
01390                                         filename = tmpfile;
01391                                 }
01392 
01393                                 sqlite3_bind_text(_statements[BLOCK_STORE], 1,bundleID.c_str(), bundleID.length(), SQLITE_TRANSIENT);
01394 //                              sqlite3_bind_int(_statements[BLOCK_STORE], 2, (int)((*it)->getType()));
01395                                 sqlite3_bind_text(_statements[BLOCK_STORE], 3, filename.getPath().c_str(), filename.getPath().length(), SQLITE_TRANSIENT);
01396                                 sqlite3_bind_int(_statements[BLOCK_STORE], 4, 0);
01397                                 executeQuery(_statements[BLOCK_STORE]);
01398 
01399                                 //3. Delete reassembled Payload data
01400                                 payloadfile.remove();
01401 
01402                                 //4. Remove Fragment entries from BundleTable
01403                                 sqlite3_bind_text(_statements[BUNDLE_DELETE],1,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01404                                 sqlite3_bind_int64(_statements[BUNDLE_DELETE],2,bundle._sequencenumber);
01405                                 sqlite3_bind_int64(_statements[BUNDLE_DELETE],3,bundle._timestamp);
01406                                 executeQuery(_statements[BUNDLE_DELETE]);
01407 
01408                                 //5. Write the Primary Block to the BundleTable
01409                                 size_t procFlags = bundle._procflags;
01410                                 procFlags &= ~(dtn::data::PrimaryBlock::FRAGMENT);
01411 
01412                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01413                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01414                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
01415                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT);
01416                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT);
01417                                 sqlite3_bind_int(_statements[BUNDLE_STORE],6,procFlags);
01418                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp);
01419                                 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber);
01420                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime);
01421                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset);
01422                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength);
01423                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL);
01424                                 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL);
01425                                 sqlite3_bind_int(_statements[BUNDLE_STORE],14,NULL);
01426                                 sqlite3_bind_text(_statements[BUNDLE_STORE],15,NULL,0,SQLITE_TRANSIENT);
01427                                 executeQuery(_statements[BUNDLE_STORE]);
01428                         }
01429 
01430                         //There are some fragments missing
01431                         else{
01432                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01433                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01434                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
01435                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT);
01436                                 sqlite3_bind_text(_statements[BUNDLE_STORE], 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT);
01437                                 sqlite3_bind_int(_statements[BUNDLE_STORE],6,bundle._procflags);
01438                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],7,bundle._timestamp);
01439                                 sqlite3_bind_int(_statements[BUNDLE_STORE],8,bundle._sequencenumber);
01440                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],9,bundle._lifetime);
01441                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],10,bundle._fragmentoffset);
01442                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],11,bundle._appdatalength);
01443                                 sqlite3_bind_int64(_statements[BUNDLE_STORE],12,TTL);
01444                                 sqlite3_bind_int(_statements[BUNDLE_STORE],13,NULL);
01445                                 sqlite3_bind_int(_statements[BUNDLE_STORE],14,payloadsize);
01446                                 sqlite3_bind_text(_statements[BUNDLE_STORE],15,payloadfile.getPath().c_str(),payloadfile.getPath().length(),SQLITE_TRANSIENT);
01447                                 executeQuery(_statements[BUNDLE_STORE]);
01448                         }
01449                 }
01450 #endif
01451 
01452                 void SQLiteBundleStorage::raiseEvent(const Event *evt)
01453                 {
01454                         try {
01455                                 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt);
01456 
01457                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
01458                                 {
01459                                         _tasks.push(new TaskExpire(time.getTimestamp()));
01460                                 }
01461                         } catch (const std::bad_cast&) { }
01462                         
01463                         try {
01464                                 const GlobalEvent &global = dynamic_cast<const GlobalEvent&>(*evt);
01465 
01466                                 if(global.getAction() == GlobalEvent::GLOBAL_IDLE)
01467                                 {
01468                                         // switch to idle mode
01469                                         ibrcommon::MutexLock l(TaskIdle::_mutex);
01470                                         TaskIdle::_idle = true;
01471 
01472                                         // generate an idle task
01473                                         _tasks.push(new TaskIdle());
01474                                 }
01475                                 else if(global.getAction() == GlobalEvent::GLOBAL_BUSY)
01476                                 {
01477                                         // switch back to non-idle mode
01478                                         ibrcommon::MutexLock l(TaskIdle::_mutex);
01479                                         TaskIdle::_idle = false;
01480                                 }
01481                         } catch (const std::bad_cast&) { }
01482                 }
01483 
01484                 void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage)
01485                 {
01486                         /*
01487                          * Only if the actual time is bigger or equal than the time when the next bundle expires, deleteexpired is called.
01488                          */
01489                         size_t exp_time = storage.get_expire_time();
01490                         if ((_timestamp < exp_time) || (exp_time == 0)) return;
01491 
01492                         /*
01493                          * Performanceverbesserung: Damit die Abfragen nicht jede Sekunde ausgeführt werden müssen, speichert man den Zeitpunkt an dem
01494                          * das nächste Bündel gelöscht werden soll in eine Variable und führt deleteexpired erst dann aus wenn ein Bündel abgelaufen ist.
01495                          * Nach dem Löschen wird die DB durchsucht und der nächste Ablaufzeitpunkt wird in die Variable gesetzt.
01496                          */
01497 
01498                         {
01499                                 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_FILENAMES], storage._statements[EXPIRE_BUNDLE_FILENAMES]);
01500 
01501                                 // query for blocks of expired bundles
01502                                 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_FILENAMES], 1, _timestamp);
01503                                 while (sqlite3_step(storage._statements[EXPIRE_BUNDLE_FILENAMES]) == SQLITE_ROW)
01504                                 {
01505                                         ibrcommon::File block((const char*)sqlite3_column_text(storage._statements[EXPIRE_BUNDLE_FILENAMES],0));
01506                                         block.remove();
01507                                 }
01508                         }
01509 
01510                         {
01511                                 AutoResetLock l(storage._locks[EXPIRE_BUNDLES], storage._statements[EXPIRE_BUNDLES]);
01512 
01513                                 // query expired bundles
01514                                 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLES], 1, _timestamp);
01515                                 while (sqlite3_step(storage._statements[EXPIRE_BUNDLES]) == SQLITE_ROW)
01516                                 {
01517                                         dtn::data::BundleID id;
01518                                         storage.get_bundleid(storage._statements[EXPIRE_BUNDLES], id);
01519                                         dtn::core::BundleExpiredEvent::raise(id);
01520                                 }
01521                         }
01522 
01523                         {
01524                                 AutoResetLock l(storage._locks[EXPIRE_BUNDLE_DELETE], storage._statements[EXPIRE_BUNDLE_DELETE]);
01525 
01526                                 // delete all expired db entries (bundles and blocks)
01527                                 sqlite3_bind_int64(storage._statements[EXPIRE_BUNDLE_DELETE], 1, _timestamp);
01528                                 sqlite3_step(storage._statements[EXPIRE_BUNDLE_DELETE]);
01529                         }
01530 
01531                         //update deprecated timer
01532                         storage.update_expire_time();
01533                 }
01534 
01535                 void SQLiteBundleStorage::update_expire_time()
01536                 {
01537                         ibrcommon::MutexLock l(_locks[EXPIRE_NEXT_TIMESTAMP]);
01538                         int err = sqlite3_step(_statements[EXPIRE_NEXT_TIMESTAMP]);
01539 
01540                         if (err == SQLITE_ROW)
01541                         {
01542                                 ibrcommon::MutexLock l(_next_expiration_lock);
01543                                 _next_expiration = sqlite3_column_int64(_statements[EXPIRE_NEXT_TIMESTAMP], 0);
01544                         }
01545                         else
01546                         {
01547                                 ibrcommon::MutexLock l(_next_expiration_lock);
01548                                 _next_expiration = 0;
01549                         }
01550 
01551                         sqlite3_reset(_statements[EXPIRE_NEXT_TIMESTAMP]);
01552                 }
01553 
01554                 void SQLiteBundleStorage::update_custodian(const dtn::data::BundleID &id, const dtn::data::EID &custodian)
01555                 {
01556                         // select query with or without fragmentation extension
01557                         STORAGE_STMT query = BUNDLE_UPDATE_CUSTODIAN;
01558                         if (id.fragment)        query = FRAGMENT_UPDATE_CUSTODIAN;
01559 
01560 
01561                         AutoResetLock l(_locks[query], _statements[query]);
01562 
01563                         sqlite3_bind_text(_statements[query], 1, custodian.getString().c_str(), custodian.getString().length(), SQLITE_TRANSIENT);
01564                         set_bundleid(_statements[query], id, 1);
01565 
01566                         // update the custodian in the database
01567                         int err = sqlite3_step(_statements[query]);
01568 
01569                         if (err != SQLITE_DONE)
01570                         {
01571                                 stringstream error;
01572                                 error << "SQLiteBundleStorage: update_custodian() failure: " << err << " " <<  sqlite3_errmsg(_database);
01573                                 IBRCOMMON_LOGGER(error) << error.str() << IBRCOMMON_LOGGER_ENDL;
01574                         }
01575                 }
01576 
01577                 int SQLiteBundleStorage::store_blocks(const data::Bundle &bundle)
01578                 {
01579                         int blocktyp, blocknumber(1), storedBytes(0);
01580 
01581                         // get all blocks of the bundle
01582                         const list<const dtn::data::Block*> blocklist = bundle.getBlocks();
01583 
01584                         // generate a bundle id
01585                         dtn::data::BundleID id(bundle);
01586 
01587                         for(std::list<const dtn::data::Block*>::const_iterator it = blocklist.begin() ;it != blocklist.end(); it++)
01588                         {
01589                                 const dtn::data::Block &block = (**it);
01590                                 blocktyp = (int)block.getType();
01591 
01592                                 ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
01593 
01594                                 try {
01595                                         try {
01596                                                 const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(block);
01597                                                 ibrcommon::BLOB::Reference ref = payload.getBLOB();
01598                                                 ibrcommon::BLOB::iostream stream = ref.iostream();
01599 
01600                                                 const SQLiteBLOB &blob = dynamic_cast<const SQLiteBLOB&>(*ref.getPointer());
01601 
01602                                                 // first remove the tmp file
01603                                                 tmpfile.remove();
01604 
01605                                                 // make a hardlink to the origin blob file
01606                                                 if ( ::link(blob._file.getPath().c_str(), tmpfile.getPath().c_str()) != 0 )
01607                                                 {
01608                                                         tmpfile = ibrcommon::TemporaryFile(_blockPath, "block");
01609                                                         throw ibrcommon::Exception("hard-link failed");
01610                                                 }
01611                                         } catch (const std::bad_cast&) {
01612                                                 throw ibrcommon::Exception("not a Payload or SQLiteBLOB");
01613                                         }
01614 
01615                                         storedBytes += _blockPath.size();
01616                                 } catch (const ibrcommon::Exception&) {
01617                                         std::ofstream filestream(tmpfile.getPath().c_str(), std::ios_base::out | std::ios::binary);
01618                                         dtn::data::SeparateSerializer serializer(filestream);
01619                                         serializer << block;
01620                                         storedBytes += serializer.getLength(block);
01621                                         filestream.close();
01622                                 }
01623 
01624                                 // protect this query from concurrent access and enable the auto-reset feature
01625                                 AutoResetLock l(_locks[BLOCK_STORE], _statements[BLOCK_STORE]);
01626 
01627                                 // set bundle key data
01628                                 set_bundleid(_statements[BLOCK_STORE], id);
01629 
01630                                 // set the column four to null if this is not a fragment
01631                                 if (!id.fragment) sqlite3_bind_null(_statements[BLOCK_STORE], 4);
01632 
01633                                 // set the block type
01634                                 sqlite3_bind_int(_statements[BLOCK_STORE], 5, blocktyp);
01635 
01636                                 // the filename of the block data
01637                                 sqlite3_bind_text(_statements[BLOCK_STORE], 6, tmpfile.getPath().c_str(), tmpfile.getPath().size(), SQLITE_TRANSIENT);
01638 
01639                                 // the ordering number
01640                                 sqlite3_bind_int(_statements[BLOCK_STORE], 7, blocknumber);
01641 
01642                                 // execute the query and store the block in the database
01643                                 if (sqlite3_step(_statements[BLOCK_STORE]) != SQLITE_DONE)
01644                                 {
01645                                         throw SQLiteQueryException("can not store block of bundle");
01646                                 }
01647 
01648                                 //increment blocknumber
01649                                 blocknumber++;
01650                         }
01651 
01652                         return storedBytes;
01653                 }
01654 
01655                 void SQLiteBundleStorage::get_blocks(dtn::data::Bundle &bundle, const dtn::data::BundleID &id)
01656                 {
01657                         int err = 0;
01658                         string file;
01659 
01660                         // select the right statement to use
01661                         const size_t stmt_key = id.fragment ? BLOCK_GET_ID_FRAGMENT : BLOCK_GET_ID;
01662 
01663                         AutoResetLock l(_locks[stmt_key], _statements[stmt_key]);
01664 
01665                         // set the bundle key values
01666                         set_bundleid(_statements[stmt_key], id);
01667 
01668                         // query the database and step through all blocks
01669                         while ((err = sqlite3_step(_statements[stmt_key])) == SQLITE_ROW)
01670                         {
01671                                 const ibrcommon::File f( (const char*) sqlite3_column_text(_statements[stmt_key], 0) );
01672                                 int blocktyp = sqlite3_column_int(_statements[stmt_key], 1);
01673 
01674                                 // open the file
01675                                 std::ifstream is(f.getPath().c_str(), std::ios::binary | std::ios::in);
01676 
01677                                 if (blocktyp == dtn::data::PayloadBlock::BLOCK_TYPE)
01678                                 {
01679                                         // create a new BLOB object
01680                                         SQLiteBLOB *blob = new SQLiteBLOB(_blobPath);
01681 
01682                                         // remove the corresponding file
01683                                         blob->_file.remove();
01684 
01685                                         // generate a hardlink, pointing to the BLOB file
01686                                         if ( ::link(f.getPath().c_str(), blob->_file.getPath().c_str()) == 0)
01687                                         {
01688                                                 // create a reference of the BLOB
01689                                                 ibrcommon::BLOB::Reference ref(blob);
01690 
01691                                                 // add payload block to the bundle
01692                                                 bundle.push_back(ref);
01693                                         }
01694                                         else
01695                                         {
01696                                                 delete blob;
01697                                                 IBRCOMMON_LOGGER(error) << "unable to load bundle: failed to create a hard-link" << IBRCOMMON_LOGGER_ENDL;
01698                                         }
01699                                 }
01700                                 else
01701                                 {
01702                                         // read the block
01703                                         dtn::data::Block &block = dtn::data::SeparateDeserializer(is, bundle).readBlock();
01704 
01705                                         // close the file
01706                                         is.close();
01707 
01708                                         // modify the age block if present
01709                                         try {
01710                                                 dtn::data::AgeBlock &agebl = dynamic_cast<dtn::data::AgeBlock&>(block);
01711 
01712                                                 // modify the AgeBlock with the age of the file
01713                                                 time_t age = f.lastaccess() - f.lastmodify();
01714 
01715                                                 agebl.addSeconds(age);
01716                                         } catch (const std::bad_cast&) { };
01717                                 }
01718                         }
01719 
01720                         if (err == SQLITE_DONE)
01721                         {
01722                                 if (bundle.getBlocks().size() == 0)
01723                                 {
01724                                         IBRCOMMON_LOGGER(error) << "get_blocks: no blocks found for: " << id.toString() << IBRCOMMON_LOGGER_ENDL;
01725                                         throw SQLiteQueryException("no blocks found");
01726                                 }
01727                         }
01728                         else
01729                         {
01730                                 IBRCOMMON_LOGGER(error) << "get_blocks() failure: "<< err << " " << sqlite3_errmsg(_database) << IBRCOMMON_LOGGER_ENDL;
01731                                 throw SQLiteQueryException("can not query for blocks");
01732                         }
01733                 }
01734 
01735                 void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage)
01736                 {
01737                         // until IDLE is false
01738                         while (true)
01739                         {
01740                                 /*
01741                                  * When an object (table, index, trigger, or view) is dropped from the database, it leaves behind empty space.
01742                                  * This empty space will be reused the next time new information is added to the database. But in the meantime,
01743                                  * the database file might be larger than strictly necessary. Also, frequent inserts, updates, and deletes can
01744                                  * cause the information in the database to become fragmented - scrattered out all across the database file rather
01745                                  * than clustered together in one place.
01746                                  * The VACUUM command cleans the main database. This eliminates free pages, aligns table data to be contiguous,
01747                                  * and otherwise cleans up the database file structure.
01748                                  */
01749                                 {
01750                                         AutoResetLock l(storage._locks[VACUUM], storage._statements[VACUUM]);
01751                                         sqlite3_step(storage._statements[VACUUM]);
01752                                 }
01753 
01754                                 // here we can do some IDLE stuff...
01755                                 ::sleep(1);
01756 
01757                                 ibrcommon::MutexLock l(TaskIdle::_mutex);
01758                                 if (!TaskIdle::_idle) return;
01759                         }
01760                 }
01761 
01762                 const std::string SQLiteBundleStorage::getName() const
01763                 {
01764                         return "SQLiteBundleStorage";
01765                 }
01766 
01767                 void SQLiteBundleStorage::releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id)
01768                 {
01769                         // custody is successful transferred to another node.
01770                         // it is safe to delete this bundle now. (depending on the routing algorithm.)
01771                         // update the custodian of this bundle with the new one
01772                         update_custodian(id, custodian);
01773                 }
01774 
01775                 void SQLiteBundleStorage::new_expire_time(size_t ttl)
01776                 {
01777                         ibrcommon::MutexLock l(_next_expiration_lock);
01778                         if (_next_expiration == 0 || ttl < _next_expiration)
01779                         {
01780                                 _next_expiration = ttl;
01781                         }
01782                 }
01783 
01784                 void SQLiteBundleStorage::reset_expire_time()
01785                 {
01786                         ibrcommon::MutexLock l(_next_expiration_lock);
01787                         _next_expiration = 0;
01788                 }
01789 
01790                 size_t SQLiteBundleStorage::get_expire_time()
01791                 {
01792                         ibrcommon::MutexLock l(_next_expiration_lock);
01793                         return _next_expiration;
01794                 }
01795 
01796                 void SQLiteBundleStorage::add_deletion(const dtn::data::BundleID &id)
01797                 {
01798                         ibrcommon::MutexLock l(_deletion_mutex);
01799                         _deletion_list.insert(id);
01800                 }
01801 
01802                 void SQLiteBundleStorage::remove_deletion(const dtn::data::BundleID &id)
01803                 {
01804                         ibrcommon::MutexLock l(_deletion_mutex);
01805                         _deletion_list.erase(id);
01806                 }
01807 
01808                 bool SQLiteBundleStorage::contains_deletion(const dtn::data::BundleID &id)
01809                 {
01810                         ibrcommon::MutexLock l(_deletion_mutex);
01811                         return (_deletion_list.find(id) != _deletion_list.end());
01812                 }
01813 
01814                 void SQLiteBundleStorage::trylock() throw (ibrcommon::MutexException)
01815                 {
01816                         // not implemented
01817                         throw ibrcommon::MutexException();
01818                 }
01819 
01820                 void SQLiteBundleStorage::enter() throw (ibrcommon::MutexException)
01821                 {
01822                         // lock all statements
01823                         for (int i = 0; i < SQL_QUERIES_END; i++)
01824                         {
01825                                 // lock the statement
01826                                 _locks[i].enter();
01827                         }
01828                 }
01829 
01830                 void SQLiteBundleStorage::leave() throw (ibrcommon::MutexException)
01831                 {
01832                         // unlock all statements
01833                         for (int i = 0; i < SQL_QUERIES_END; i++)
01834                         {
01835                                 // unlock the statement
01836                                 _locks[i].leave();
01837                         }
01838                 }
01839 
01840                 void SQLiteBundleStorage::set_bundleid(sqlite3_stmt *st, const dtn::data::BundleID &id, size_t offset) const
01841                 {
01842                         const std::string source_id = id.source.getString();
01843                         sqlite3_bind_text(st, offset + 1, source_id.c_str(), source_id.length(), SQLITE_TRANSIENT);
01844                         sqlite3_bind_int64(st, offset + 2, id.timestamp);
01845                         sqlite3_bind_int64(st, offset + 3, id.sequencenumber);
01846 
01847                         if (id.fragment)
01848                         {
01849                                 sqlite3_bind_int64(st, offset + 4, id.offset);
01850                         }
01851                 }
01852 
01853                 void SQLiteBundleStorage::get_bundleid(sqlite3_stmt *st, dtn::data::BundleID &id, size_t offset) const
01854                 {
01855                         id.source = dtn::data::EID((const char*)sqlite3_column_text(st, offset + 0));
01856                         id.timestamp = sqlite3_column_int64(st, offset + 1);
01857                         id.sequencenumber = sqlite3_column_int64(st, offset + 2);
01858 
01859                         id.fragment = (sqlite3_column_text(st, offset + 3) != NULL);
01860                         id.offset = sqlite3_column_int64(st, offset + 3);
01861                 }
01862         }
01863 }