00001
00002
00003
00004
00005
00006
00007
00008
00009 #include "core/SQLiteBundleStorage.h"
00010 #include "core/TimeEvent.h"
00011 #include "core/GlobalEvent.h"
00012 #include "core/BundleCore.h"
00013 #include "core/BundleExpiredEvent.h"
00014 #include "core/SQLiteConfigure.h"
00015 #include "core/BundleEvent.h"
00016 #include "core/BundleExpiredEvent.h"
00017
00018 #include <ibrdtn/data/PayloadBlock.h>
00019 #include <ibrdtn/data/Serializer.h>
00020 #include <ibrdtn/data/Bundle.h>
00021 #include <ibrdtn/data/BundleID.h>
00022
00023 #include <ibrcommon/thread/MutexLock.h>
00024 #include <ibrcommon/Exceptions.h>
00025 #include <ibrcommon/data/File.h>
00026 #include <ibrcommon/data/BLOB.h>
00027 #include <ibrcommon/AutoDelete.h>
00028
00029 #include <dirent.h>
00030 #include <iostream>
00031 #include <fstream>
00032 #include <list>
00033 #include <stdio.h>
00034 #include <string.h>
00035
00036
00037 using namespace std;
00038
00039
00040 namespace dtn {
00041 namespace core {
00042
00043 const std::string SQLiteBundleStorage::_tables[SQL_TABLE_END] =
00044 { "Bundles", "Fragments", "Block", "Routing", "BundleRoutingInfo", "NodeRoutingInfo" };
00045
00046 SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const string &file, const int &size)
00047 : global_shutdown(false), dbPath(path.getPath()), dbFile(file), dbSize(size), actual_time(0), nextExpiredTime(0)
00048 {
00049 int err = 0;
00050
00051
00052 SQLiteConfigure::configure();
00053
00054 {
00055
00056 err = sqlite3_open_v2((dbPath+"/"+dbFile).c_str(),&database,SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL);
00057 if (err){
00058 std::cerr << "Can't open database: " << sqlite3_errmsg(database);
00059 sqlite3_close(database);
00060 throw "Unable to open SQLite Database";
00061 }
00062
00063
00064 sqlite3_stmt *createBundleTable = prepareStatement("create table if not exists "+ _tables[SQL_TABLE_BUNDLE] +" (Key INTEGER PRIMARY KEY ASC, BundleID text, Source text, Destination text, Reportto text, Custodian text, ProcFlags int, Timestamp int, Sequencenumber int, Lifetime int, Fragmentoffset int, Appdatalength int, TTL int, Size int);");
00065 err = sqlite3_step(createBundleTable);
00066 if(err != SQLITE_DONE){
00067 std::cerr << "SQLiteBundleStorage: Constructorfailure: create BundleTable failed " << err;
00068 }
00069 sqlite3_finalize(createBundleTable);
00070
00071
00072 sqlite3_stmt *createFragmentTable = prepareStatement("create table if not exists "+ _tables[SQL_TABLE_FRAGMENT] +" (Key INTEGER PRIMARY KEY ASC, Source text, Timestamp int, Sequencenumber int, Destination text, TTL int, Priority int, FragementationOffset int, PayloadLength int, Payloadname text);");
00073 err = sqlite3_step(createFragmentTable);
00074 if(err != SQLITE_DONE){
00075 std::cerr << "SQLiteBundleStorage: Constructorfailure: create FragemntTable failed " << err;
00076 }
00077 sqlite3_finalize(createFragmentTable);
00078
00079
00080 sqlite3_stmt *createBlockTable = prepareStatement("create table if not exists "+ _tables[SQL_TABLE_BLOCK] +" (Key INTEGER PRIMARY KEY ASC, BundleID text, BlockType int, Filename text, Blocknumber int);");
00081 err = sqlite3_step(createBlockTable);
00082 if(err != SQLITE_DONE){
00083 std::cerr << "SQLiteBundleStorage: Constructorfailure: create BlockTable failed " << err;
00084 }
00085 sqlite3_finalize(createBlockTable);
00086
00087
00088 sqlite3_stmt *createRoutingTable = prepareStatement("create table if not exists "+ _tables[SQL_TABLE_ROUTING] +" (INTEGER PRIMARY KEY ASC, Key int, Routing text);");
00089 err = sqlite3_step(createRoutingTable);
00090 if(err != SQLITE_DONE){
00091 std::cerr << "SQLiteBundleStorage: Constructorfailure: create RoutingTable failed " << err;
00092 }
00093 sqlite3_finalize(createRoutingTable);
00094
00095
00096 sqlite3_stmt *createBundleRoutingInfo = prepareStatement("create table if not exists "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, BundleID text, Key int, Routing text);");
00097 err = sqlite3_step(createBundleRoutingInfo);
00098 if(err != SQLITE_DONE){
00099 std::cerr << "SQLiteBundleStorage: Constructorfailure: create RoutingTable failed " << err;
00100 }
00101 sqlite3_finalize(createBundleRoutingInfo);
00102
00103
00104 sqlite3_stmt *createNodeRoutingInfo = prepareStatement("create table if not exists "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (INTEGER PRIMARY KEY ASC, EID text, Key int, Routing text);");
00105 err = sqlite3_step(createNodeRoutingInfo);
00106 if(err != SQLITE_DONE){
00107 std::cerr << "SQLiteBundleStorage: Constructorfailure: create RoutingTable failed " << err;
00108 }
00109 sqlite3_finalize(createNodeRoutingInfo);
00110
00111
00112 sqlite3_stmt *createDELETETrigger = prepareStatement("CREATE TRIGGER IF NOT EXISTS Blockdelete AFTER DELETE ON "+ _tables[SQL_TABLE_BUNDLE] +" FOR EACH ROW BEGIN DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE BundleID = OLD.BundleID; DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = OLD.BundleID; END;");
00113 err = sqlite3_step(createDELETETrigger);
00114 if(err != SQLITE_DONE){
00115 std::cerr << "SQLiteBundleStorage: Constructorfailure: createTrigger failed " << err;
00116 }
00117 sqlite3_finalize(createDELETETrigger);
00118
00119 sqlite3_stmt *createINSERTTrigger = prepareStatement("CREATE TRIGGER IF NOT EXISTS BundleRoutingdelete BEFORE INSERT ON "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" BEGIN SELECT CASE WHEN (SELECT BundleID FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE BundleID = NEW.BundleID) IS NULL THEN RAISE(ABORT, \"Foreign Key Violation: BundleID doesn't exist in BundleTable\")END; END;");
00120 err = sqlite3_step(createINSERTTrigger);
00121 if(err != SQLITE_DONE){
00122 std::cerr << "SQLiteBundleStorage: Constructorfailure: createTrigger failed " << err;
00123 }
00124 sqlite3_finalize(createINSERTTrigger);
00125
00126
00127 sqlite3_stmt *createBlockIDIndex = prepareStatement("CREATE INDEX IF NOT EXISTS BlockIDIndex ON "+ _tables[SQL_TABLE_BLOCK] +" (BundleID);");
00128 err = sqlite3_step(createBlockIDIndex);
00129 if(err != SQLITE_DONE){
00130 std::cerr << "SQLiteBundleStorage: Constructorfailure: createIndex failed " << err;
00131 }
00132 sqlite3_finalize(createBlockIDIndex);
00133
00134 sqlite3_stmt *createTTLIndex = prepareStatement("CREATE INDEX IF NOT EXISTS ttlindex ON "+ _tables[SQL_TABLE_FRAGMENT] +" (TTL);");
00135 err = sqlite3_step(createTTLIndex);
00136 if(err != SQLITE_DONE){
00137 std::cerr << "SQLiteBundleStorage: Constructorfailure: createIndex failed " << err;
00138 }
00139 sqlite3_finalize(createTTLIndex);
00140
00141 sqlite3_stmt *createBundleIDIndex = prepareStatement("CREATE UNIQUE INDEX IF NOT EXISTS BundleIDIndex ON "+ _tables[SQL_TABLE_BUNDLE] +" (BundleID);");
00142 err = sqlite3_step(createBundleIDIndex);
00143 if(err != SQLITE_DONE){
00144 std::cerr << "SQLiteBundleStorage: Constructorfailure: createIndex failed " << err;
00145 }
00146 sqlite3_finalize(createBundleIDIndex);
00147
00148 sqlite3_stmt *createRoutingKeyIndex = prepareStatement("CREATE UNIQUE INDEX IF NOT EXISTS RoutingKeyIndex ON "+ _tables[SQL_TABLE_ROUTING] +" (Key);");
00149 err = sqlite3_step(createRoutingKeyIndex);
00150 if(err != SQLITE_DONE){
00151 std::cerr << "SQLiteBundleStorage: Constructorfailure: createIndex failed " << err;
00152 }
00153 sqlite3_finalize(createRoutingKeyIndex);
00154
00155 sqlite3_stmt *createBundleRoutingIDIndex = prepareStatement("CREATE UNIQUE INDEX IF NOT EXISTS BundleRoutingIDIndex ON "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID,Key);");
00156 err = sqlite3_step(createBundleRoutingIDIndex);
00157 if(err != SQLITE_DONE){
00158 std::cerr << "SQLiteBundleStorage: Constructorfailure: createIndex failed " << err;
00159 }
00160 sqlite3_finalize(createBundleRoutingIDIndex);
00161 }
00162
00163 {
00164 stringstream bundlefolder,fragmentfolder;
00165 bundlefolder << dbPath << "/" << SQL_TABLE_BLOCK;
00166 err = mkdir(bundlefolder.str().c_str(),S_IRWXU | S_IWGRP | S_IROTH);
00167 if (err != EEXIST && err != 0){
00168 cerr << "SQLiteBundleStorage: Create folder failed: "<< strerror(errno) <<endl;
00169 }
00170 fragmentfolder << dbPath << "/" << SQL_TABLE_FRAGMENT;
00171 err = mkdir(fragmentfolder.str().c_str(),S_IRWXU | S_IWGRP | S_IROTH);
00172 if (err != EEXIST && err != 0){
00173 cerr << "SQLiteBundleStorage: Create folder failed: "<< strerror(errno) <<endl;
00174 }
00175 }
00176
00177
00178 getBundleTTL = prepareStatement("SELECT Source,Timestamp,Sequencenumber FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE TTL <= ? ORDER BY TTL ASC;");
00179 getFragmentTTL = prepareStatement("SELECT Source,Timestamp,Sequencenumber,FragementationOffset,Filename,Payloadname FROM "+ _tables[SQL_TABLE_FRAGMENT] +" WHERE TTL <= ? ORDER BY TTL ASC;");
00180 deleteBundleTTL = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE TTL <= ?;");
00181 deleteFragementTTL = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_FRAGMENT] +" WHERE TTL <= ?;");
00182 getBundleByDestination = prepareStatement("SELECT BundleID, Source, Destination, Reportto, Custodian, ProcFlags, Timestamp, Sequencenumber, Lifetime, Fragmentoffset, Appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE Destination = ? ORDER BY TTL ASC;");
00183 getBundleByID = prepareStatement("SELECT Source, Destination, Reportto, Custodian, ProcFlags, Timestamp, Sequencenumber, Lifetime, Fragmentoffset, Appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE BundleID = ?;");
00184 getFragements = prepareStatement("SELECT * FROM "+ _tables[SQL_TABLE_FRAGMENT] +" WHERE Source = ? AND Timestamp = ? AND Sequencenumber = ? ORDER BY Fragmentoffset ASC;");
00185 store_Bundle = prepareStatement("INSERT INTO "+ _tables[SQL_TABLE_BUNDLE] +" (BundleID, Source, Destination, Reportto, Custodian, ProcFlags, Timestamp, Sequencenumber, Lifetime, Fragmentoffset, Appdatalength, TTL, Size) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);");
00186 store_Fragment = prepareStatement("INSERT INTO "+ _tables[SQL_TABLE_FRAGMENT] +" (Source, Timestamp, Sequencenumber, Destination, TTL, Priority, FragementationOffset, PayloadLength, Payloadname) VALUES (?,?,?,?,?,?,?,?,?);");
00187 clearBundles = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +";");
00188 clearFragments = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_FRAGMENT] +";");
00189 clearBlocks = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_BLOCK] +";");
00190 clearRouting = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +";");
00191 clearNodeRouting = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +";");
00192 countEntries = prepareStatement("SELECT Count(ROWID) From "+ _tables[SQL_TABLE_BUNDLE] +";");
00193 vacuum = prepareStatement("vacuum;");
00194 getROWID = prepareStatement("select ROWID from "+ _tables[SQL_TABLE_BUNDLE] +";");
00195 removeBundle = prepareStatement("Delete From "+ _tables[SQL_TABLE_BUNDLE] +" Where BundleID = ?;");
00196 removeFragments = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_FRAGMENT] +" WHERE Source = ? AND Timestamp = ? AND Sequencenumber = ?;");
00197 getBlocksByID = prepareStatement("SELECT Filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE BundleID = ? ORDER BY Blocknumber ASC;");
00198 storeBlock = prepareStatement("INSERT INTO "+ _tables[SQL_TABLE_BLOCK] +" (BundleID, BlockType, Filename, Blocknumber) VALUES (?,?,?,?);");
00199 getBundlesBySize = prepareStatement("SELECT BundleID, Source, Destination, Reportto, Custodian, ProcFlags, Timestamp, Sequencenumber, Lifetime, Fragmentoffset, Appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY Size DESC LIMIT ?");
00200
00201 getBundleBySource = prepareStatement("SELECT BundleID, Destination, Reportto, Custodian, ProcFlags, Timestamp, Sequencenumber, Lifetime, Fragmentoffset, Appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE Source = ?;");
00202 getBundlesByTTL = prepareStatement("SELECT BundleID, Source, Destination, Reportto, Custodian, ProcFlags, Timestamp, Sequencenumber, Lifetime, Fragmentoffset, Appdatalength FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY TTL ASC LIMIT ?");
00203 getBlocks = prepareStatement("SELECT Filename FROM "+ _tables[SQL_TABLE_BLOCK] +" WHERE BundleID = ? AND Blocknumber = ?;");
00204 getProcFlags = prepareStatement("SELECT ProcFlags From "+ _tables[SQL_TABLE_BUNDLE] +" WHERE BundleID = ?;");
00205 updateProcFlags = prepareStatement("UPDATE "+ _tables[SQL_TABLE_BUNDLE] +" SET ProcFlags = ? WHERE BundleID = ?;");
00206 storeBundleRouting = prepareStatement("INSERT INTO "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" (BundleID, Key, Routing) VALUES (?,?,?);");
00207 getBundleRouting = prepareStatement("SELECT Routing FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;");
00208 removeBundleRouting = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_BUNDLE_ROUTING_INFO] +" WHERE BundleID = ? AND Key = ?;");
00209 storeRouting = prepareStatement("INSERT INTO "+ _tables[SQL_TABLE_ROUTING] +" (Key, Routing) VALUES (?,?);");
00210 getRouting = prepareStatement("SELECT Routing FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;");
00211 removeRouting = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_ROUTING] +" WHERE Key = ?;");
00212 storeNodeRouting = prepareStatement("INSERT INTO "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" (EID, Key, Routing) VALUES (?,?,?);");
00213 getNodeRouting = prepareStatement("SELECT Routing FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;");
00214 removeNodeRouting = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_NODE_ROUTING_INFO] +" WHERE EID = ? AND Key = ?;");
00215 getnextExpiredBundle = prepareStatement("SELECT TTL FROM "+ _tables[SQL_TABLE_BUNDLE] +" ORDER BY TTL ASC LIMIT 1;");
00216 getnextExpiredFragment = prepareStatement("SELECT TTL FROM "+ _tables[SQL_TABLE_FRAGMENT] +" ORDER BY TTL ASC LIMIT 1;");
00217
00218
00219 consistenceCheck();
00220
00221
00222 bindEvent(TimeEvent::className);
00223 bindEvent(GlobalEvent::className);
00224 }
00225
00226 SQLiteBundleStorage::~SQLiteBundleStorage(){
00227
00228 ibrcommon::MutexLock lock(dbMutex);
00229 join();
00230
00231 sqlite3_finalize(getBundleTTL);
00232 sqlite3_finalize(getFragmentTTL);
00233 sqlite3_finalize(deleteBundleTTL);
00234 sqlite3_finalize(getBundleByDestination);
00235 sqlite3_finalize(deleteFragementTTL);
00236 sqlite3_finalize(getBundleByID);
00237 sqlite3_finalize(getFragements);
00238 sqlite3_finalize(store_Bundle);
00239 sqlite3_finalize(store_Fragment);
00240 sqlite3_finalize(clearBundles);
00241 sqlite3_finalize(clearFragments);
00242 sqlite3_finalize(clearBlocks);
00243 sqlite3_finalize(clearRouting);
00244 sqlite3_finalize(clearNodeRouting);
00245 sqlite3_finalize(countEntries);
00246 sqlite3_finalize(vacuum);
00247 sqlite3_finalize(getROWID);
00248 sqlite3_finalize(removeBundle);
00249 sqlite3_finalize(removeFragments);
00250 sqlite3_finalize(getBlocksByID);
00251 sqlite3_finalize(storeBlock);
00252 sqlite3_finalize(getBundlesBySize);
00253 sqlite3_finalize(getBundleBySource);
00254 sqlite3_finalize(getBundlesByTTL);
00255 sqlite3_finalize(getBlocks);
00256 sqlite3_finalize(getProcFlags);
00257 sqlite3_finalize(updateProcFlags);
00258 sqlite3_finalize(getBundleRouting);
00259 sqlite3_finalize(storeBundleRouting);
00260 sqlite3_finalize(removeBundleRouting);
00261 sqlite3_finalize(storeRouting);
00262 sqlite3_finalize(getRouting);
00263 sqlite3_finalize(removeRouting);
00264 sqlite3_finalize(storeNodeRouting);
00265 sqlite3_finalize(getNodeRouting);
00266 sqlite3_finalize(removeNodeRouting);
00267 sqlite3_finalize(getnextExpiredBundle);
00268 sqlite3_finalize(getnextExpiredFragment);
00269
00270
00271
00272 int err = sqlite3_close(database);
00273 if (err != SQLITE_OK){
00274 cerr <<"unable to close Database" <<endl;
00275 }
00276
00277
00278 unbindEvent(TimeEvent::className);
00279 unbindEvent(GlobalEvent::className);
00280 }
00281
00282 void SQLiteBundleStorage::consistenceCheck(){
00283
00284
00285
00286
00287
00288
00289
00290 set<string> fileList;
00291 set<string> fragmentList;
00292 set<string>::iterator fileList_iterator, fileList_iterator2;
00293 set<string> dbfragmentlist;
00294 set<string> eventList;
00295 set<string>::iterator event_Iterator;
00296 string source, datei;
00297 string filename, id, payloadfilename;
00298 int err;
00299 size_t timestamp, sequencenumber, offset;
00300 stringstream directory,directory2;
00301
00302 list<ibrcommon::File> blist, flist;
00303 list<ibrcommon::File>::iterator file_it;
00304
00305
00306 directory << dbPath << "/" << SQL_TABLE_BLOCK;
00307 directory2 << dbPath << "/" << SQL_TABLE_FRAGMENT;
00308
00309 ibrcommon::File folder(directory.str());
00310 ibrcommon::File folder2(directory2.str());
00311
00312 folder.getFiles(blist);
00313 for(file_it = blist.begin(); file_it != blist.end(); file_it++){
00314 datei = (*file_it).getPath();
00315 if(datei[datei.size()-1] != '.'){
00316 fileList.insert(datei);
00317 }
00318 }
00319 folder2.getFiles(flist);
00320 for(file_it = flist.begin(); file_it != flist.end(); file_it++){
00321 datei = (*file_it).getPath();
00322 if(datei[datei.size()-1] != '.'){
00323 fragmentList.insert(datei);
00324 }
00325 }
00326
00327
00328 int i;
00329 sqlite3_stmt *bundleconistencycheck = prepareStatement("SELECT Filename,BundleID FROM "+ _tables[SQL_TABLE_BLOCK] +";");
00330 for(i = sqlite3_step(bundleconistencycheck); i == SQLITE_ROW; i=sqlite3_step(bundleconistencycheck)){
00331 filename = (const char*)sqlite3_column_text(bundleconistencycheck,0);
00332 id = (const char*)sqlite3_column_text(bundleconistencycheck,1);
00333 fileList_iterator = fileList.find(filename);
00334 if(fileList_iterator == fileList.end()){
00335 eventList.insert(id);
00336 }
00337 else{
00338 fileList.erase(fileList_iterator);
00339 }
00340 }
00341 sqlite3_finalize(bundleconistencycheck);
00342
00343
00344
00345 if(eventList.size() != 0){
00346 sqlite3_stmt *deleteBundle = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_BUNDLE] +" WHERE BundleID = ?;");
00347 for(event_Iterator = eventList.begin(); event_Iterator != eventList.end(); event_Iterator++){
00348
00349
00350 const dtn::data::BundleID id(*event_Iterator);
00351 const dtn::data::MetaBundle mb(id, ...);
00352 dtn::core::BundleEvent::raise(mb, BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE);
00353
00354
00355 sqlite3_bind_text(getBlocksByID,1,(*event_Iterator).c_str(),(*event_Iterator).length(), SQLITE_TRANSIENT);
00356 for(err = sqlite3_step(getBlocksByID); err == SQLITE_ROW; err = sqlite3_step(getBlocksByID)){
00357 filename = (const char*)sqlite3_column_text(getBlocksByID,0);
00358 fileList.insert(filename.c_str());
00359 }
00360 sqlite3_reset(getBlocksByID);
00361 if ( err != SQLITE_DONE )
00362 {
00363 std::cerr << "SQLiteBundlestorage: consistenceCheck: failure while getting Blocks: " << err << std::endl;
00364 }
00365
00366
00367 sqlite3_bind_text(deleteBundle,1,(*event_Iterator).c_str(),(*event_Iterator).length(), SQLITE_TRANSIENT);
00368 err = sqlite3_step(deleteBundle);
00369 sqlite3_reset(deleteBundle);
00370 if ( err != SQLITE_DONE )
00371 {
00372 std::cerr << "SQLiteBundlestorage: failure while deleting inconsistent Data: " << err << std::endl;
00373 }
00374 }
00375 sqlite3_finalize(deleteBundle);
00376 eventList.clear();
00377 }
00378
00379
00380 sqlite3_stmt *fragmentconistencycheck = prepareStatement("SELECT Source, Timestamp, Sequencenumber, FragementationOffset, Payloadname FROM "+ _tables[SQL_TABLE_FRAGMENT] +";");
00381 for ( int i = sqlite3_step(fragmentconistencycheck); i == SQLITE_ROW; i = sqlite3_step(fragmentconistencycheck)){
00382 payloadfilename = (const char*) sqlite3_column_text(fragmentconistencycheck,4);
00383 fileList_iterator = fragmentList.find(payloadfilename);
00384 if( fileList_iterator == fragmentList.end()){
00385
00386 source = (const char*) sqlite3_column_text(fragmentconistencycheck,0);
00387 timestamp = sqlite3_column_int64(fragmentconistencycheck,1);
00388 sequencenumber = sqlite3_column_int64(fragmentconistencycheck,2);
00389 offset = sqlite3_column_int64(fragmentconistencycheck,3);
00390 stringstream strom;
00391 strom << "[" << timestamp << "." << sequencenumber << "." << offset << "] " << source;
00392 eventList.insert(strom.str());
00393 }
00394 else{
00395
00396 dbfragmentlist.insert((*fileList_iterator));
00397 }
00398 }
00399 sqlite3_finalize(fragmentconistencycheck);
00400
00401
00402 if(eventList.size() != 0){
00403 sqlite3_stmt *deleteFragment = prepareStatement("DELETE FROM "+ _tables[SQL_TABLE_FRAGMENT] +" WHERE BundleID = ?;");
00404 for(event_Iterator = eventList.begin(); event_Iterator != eventList.end(); event_Iterator++){
00405
00406 const dtn::data::BundleID id(*event_Iterator);
00407 const dtn::data::MetaBundle mb(id, ...);
00408 dtn::core::BundleEvent::raise(mb, BUNDLE_DELETED, dtn::data::StatusReportBlock::DEPLETED_STORAGE);
00409
00410 sqlite3_bind_text(deleteFragment,1,(*event_Iterator).c_str(),(*event_Iterator).length(), SQLITE_TRANSIENT);
00411 err = sqlite3_step(deleteFragment);
00412 if ( err != SQLITE_DONE )
00413 {
00414 std::cerr << "SQLiteBundlestorage: failure while deleting inconsistent Data: " << err << std::endl;
00415 }
00416 sqlite3_reset(deleteFragment);
00417 }
00418 sqlite3_finalize(deleteFragment);
00419 eventList.clear();
00420 }
00421
00422
00423 if(fileList.size() != 0){
00424 for(fileList_iterator = fileList.begin(); fileList_iterator != fileList.end(); fileList_iterator++){
00425 ::remove((*fileList_iterator).c_str());
00426 clog << "Warning: the file " << (*fileList_iterator) << " was deleted" <<endl;
00427 }
00428 }
00429
00430 for(fileList_iterator = fragmentList.begin(); fileList_iterator != fragmentList.end(); fileList_iterator++){
00431 fileList_iterator2 = dbfragmentlist.find((*fileList_iterator));
00432
00433 if(fileList_iterator2 == dbfragmentlist.end()){
00434 ::remove((*fileList_iterator).c_str());
00435 clog << "Warning: the file " << (*fileList_iterator) << " was deleted" <<endl;
00436 }
00437 }
00438 }
00439
00440 sqlite3_stmt* SQLiteBundleStorage::prepareStatement(string sqlquery){
00441
00442 sqlite3_stmt *statement;
00443 int err = sqlite3_prepare_v2(database, sqlquery.c_str(), sqlquery.length(), &statement, 0);
00444 if ( err != SQLITE_OK )
00445 {
00446 std::cerr << "SQLiteBundlestorage: failure in prepareStatement: " << err << " with Query: " << sqlquery << std::endl;
00447 }
00448 return statement;
00449 }
00450
00451 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id){
00452 int err;
00453 size_t procflag;
00454 data::Bundle bundle;
00455
00456
00457 string source, destination, reportto, custodian;
00458
00459
00460 string ID = id.toString();
00461
00462 {
00463 ibrcommon::MutexLock lock(dbMutex);
00464 sqlite3_bind_text(getBundleByID, 1, ID.c_str(), ID.length(),SQLITE_TRANSIENT);
00465
00466
00467 err = sqlite3_step(getBundleByID);
00468
00469
00470 if(err != SQLITE_ROW){
00471 sqlite3_reset(getBundleByID);
00472 stringstream error;
00473 error << "SQLiteBundleStorage: No Bundle found with BundleID: " << id.toString();
00474 std::cerr << error.str();
00475 throw SQLiteQuerryException(error.str());
00476 }
00477 source = (const char*) sqlite3_column_text(getBundleByID, 0);
00478 destination = (const char*) sqlite3_column_text(getBundleByID, 1);
00479 reportto = (const char*) sqlite3_column_text(getBundleByID, 2);
00480 custodian = (const char*) sqlite3_column_text(getBundleByID, 3);
00481 procflag = sqlite3_column_int(getBundleByID, 4);
00482 bundle._timestamp = sqlite3_column_int64(getBundleByID, 5);
00483 bundle._sequencenumber = sqlite3_column_int64(getBundleByID, 6);
00484 bundle._lifetime = sqlite3_column_int64(getBundleByID, 7);
00485 if(procflag & data::Bundle::FRAGMENT){
00486 bundle._fragmentoffset = sqlite3_column_int64(getBundleByID, 8);
00487 bundle._appdatalength = sqlite3_column_int64(getBundleByID,9);
00488 }
00489
00490 err = sqlite3_step(getBundleByID);
00491
00492
00493 sqlite3_reset(getBundleByID);
00494
00495 if (err != SQLITE_DONE){
00496 throw "SQLiteBundleStorage: Database contains two or more Bundle with the same BundleID, the requested Bundle is maybe a Fragment";
00497 }
00498
00499
00500 readBlocks(bundle,id.toString());
00501 }
00502
00503 bundle._source = data::EID(source);
00504 bundle._destination = data::EID(destination);
00505 bundle._reportto = data::EID(reportto);
00506 bundle._custodian = data::EID(custodian);
00507 bundle._procflags = procflag;
00508
00509 return bundle;
00510 }
00511
00512 const dtn::data::MetaBundle SQLiteBundleStorage::getByDestination(const dtn::data::EID &eid, bool exact)
00513 {
00514 int err;
00515 size_t procflag;
00516 dtn::data::MetaBundle bundle;
00517
00518
00519 const char *source, *destination, *reportto, *custodian;
00520 string ID(eid.getString()), bundleID;
00521
00522 {
00523 ibrcommon::MutexLock lock(dbMutex);
00524
00525 sqlite3_bind_text(getBundleByDestination, 1, ID.c_str(), ID.length(),SQLITE_TRANSIENT);
00526
00527
00528 err = sqlite3_step(getBundleByDestination);
00529
00530
00531 if(err != SQLITE_ROW){
00532 sqlite3_reset(getBundleByDestination);
00533 throw NoBundleFoundException();
00534 }
00535 bundleID = (const char*) sqlite3_column_text(getBundleByDestination, 0);
00536 source = (const char*) sqlite3_column_text(getBundleByDestination, 1);
00537 destination = (const char*) sqlite3_column_text(getBundleByDestination, 2);
00538 reportto = (const char*) sqlite3_column_text(getBundleByDestination, 3);
00539 custodian = (const char*) sqlite3_column_text(getBundleByDestination, 4);
00540 procflag = sqlite3_column_int(getBundleByDestination, 5);
00541 bundle.timestamp = sqlite3_column_int64(getBundleByDestination, 6);
00542 bundle.sequencenumber = sqlite3_column_int64(getBundleByDestination, 7);
00543 bundle.lifetime = sqlite3_column_int64(getBundleByDestination, 8);
00544 if(procflag & data::Bundle::FRAGMENT)
00545 {
00546 bundle.fragment = true;
00547 bundle.offset = sqlite3_column_int64(getBundleByDestination, 9);
00548 bundle.appdatalength = sqlite3_column_int64(getBundleByDestination,10);
00549 }
00550
00551 bundle.received = 0;
00552 bundle.source = data::EID((string)source);
00553 bundle.destination = data::EID((string)destination);
00554 bundle.reportto = data::EID((string)reportto);
00555 bundle.custodian = data::EID((string)custodian);
00556 bundle.procflags = procflag;
00557
00558
00559
00560 sqlite3_reset(getBundleByDestination);
00561 }
00562
00563 return bundle;
00564 }
00565
00566 const dtn::data::MetaBundle SQLiteBundleStorage::getByFilter(const ibrcommon::BloomFilter &filter)
00567 {
00568
00569 throw BundleStorage::NoBundleFoundException();
00570 }
00571
00572 list<data::Bundle> SQLiteBundleStorage::getBundleByTTL (const int &limit){
00573 int err;
00574 size_t procflags;
00575 string destination, reportto, custodian, bundleID, sourceID;
00576 list<data::Bundle> result;
00577 {
00578 ibrcommon::MutexLock lock(dbMutex);
00579 sqlite3_bind_int(getBundlesByTTL, 1, limit);
00580 err = sqlite3_step(getBundlesByTTL);
00581
00582 while(err == SQLITE_ROW){
00583 data::Bundle bundle;
00584 stringstream stream_bundleid;
00585 bundleID = (const char*) sqlite3_column_text(getBundlesByTTL, 0);
00586 sourceID = (const char*) sqlite3_column_text(getBundlesByTTL, 1);
00587 destination = (const char*) sqlite3_column_text(getBundlesByTTL, 2);
00588 reportto = (const char*) sqlite3_column_text(getBundlesByTTL, 3);
00589 custodian = (const char*) sqlite3_column_text(getBundlesByTTL, 4);
00590 procflags = sqlite3_column_int(getBundlesByTTL,5);
00591 bundle._timestamp = sqlite3_column_int64(getBundlesByTTL, 6);
00592 bundle._sequencenumber = sqlite3_column_int64(getBundlesByTTL, 7);
00593 bundle._lifetime = sqlite3_column_int64(getBundlesByTTL, 8);
00594
00595 if(procflags & data::Bundle::FRAGMENT){
00596 bundle._fragmentoffset = sqlite3_column_int64(getBundlesByTTL,9);
00597 bundle._appdatalength = sqlite3_column_int64(getBundlesByTTL,10);
00598 }
00599
00600 bundle._source = data::EID(sourceID);
00601 bundle._destination = data::EID(destination);
00602 bundle._reportto = data::EID(reportto);
00603 bundle._custodian = data::EID(custodian);
00604 bundle._procflags = procflags;
00605
00606
00607 readBlocks(bundle,bundleID);
00608
00609
00610 result.push_back(bundle);
00611 err = sqlite3_step(getBundlesByTTL);
00612 }
00613 sqlite3_reset(getBundlesByTTL);
00614
00615 if(err != SQLITE_DONE){
00616 stringstream error;
00617 error << "getBundleBySource: error while executing querry: " << err;
00618 std::cerr << error.str();
00619 throw SQLiteQuerryException(error.str());
00620 }
00621 }
00622 return result;
00623 }
00624
00625 list<data::Bundle> SQLiteBundleStorage::getBundlesBySource(const dtn::data::EID &sourceID){
00626 int err;
00627 size_t procflags;
00628 string destination, reportto, custodian, bundleID;
00629 list<data::Bundle> result;
00630 {
00631 ibrcommon::MutexLock lock(dbMutex);
00632 sqlite3_bind_text(getBundleBySource, 1, sourceID.getString().c_str(), sourceID.getString().length(),SQLITE_TRANSIENT);
00633 err = sqlite3_step(getBundleBySource);
00634
00635 while(err == SQLITE_ROW){
00636 data::Bundle bundle;
00637 stringstream stream_bundleid;
00638 bundleID = (const char*) sqlite3_column_text(getBundleBySource, 0);
00639 destination = (const char*) sqlite3_column_text(getBundleBySource, 1);
00640 reportto = (const char*) sqlite3_column_text(getBundleBySource, 2);
00641 custodian = (const char*) sqlite3_column_text(getBundleBySource, 3);
00642 procflags = sqlite3_column_int(getBundleBySource,4);
00643 bundle._timestamp = sqlite3_column_int64(getBundleBySource, 5);
00644 bundle._sequencenumber = sqlite3_column_int64(getBundleBySource, 6);
00645 bundle._lifetime = sqlite3_column_int64(getBundleBySource, 7);
00646
00647 if(procflags & data::Bundle::FRAGMENT){
00648 bundle._fragmentoffset = sqlite3_column_int64(getBundleBySource, 8);
00649 bundle._appdatalength = sqlite3_column_int64(getBundleBySource,9);
00650 }
00651
00652 bundle._source = sourceID;
00653 bundle._destination = data::EID(destination);
00654 bundle._reportto = data::EID(reportto);
00655 bundle._custodian = data::EID(custodian);
00656 bundle._procflags = procflags;
00657
00658
00659 readBlocks(bundle,bundleID);
00660
00661
00662 result.push_back(bundle);
00663 err = sqlite3_step(getBundleBySource);
00664 }
00665 sqlite3_reset(getBundleBySource);
00666
00667 if(err != SQLITE_DONE){
00668 stringstream error;
00669 error << "getBundleBySource: error while executing querry: " << err;
00670 std::cerr << error.str();
00671 throw SQLiteQuerryException(error.str());
00672 }
00673 }
00674 return result;
00675 }
00676
00677 list<data::Bundle> SQLiteBundleStorage::getBundlesByDestination(const dtn::data::EID &destinationID){
00678 int err;
00679 size_t procflags;
00680 const char *sourceID, *reportto, *custodian, *bundleID;
00681 list<data::Bundle> result;
00682 {
00683 ibrcommon::MutexLock lock(dbMutex);
00684 sqlite3_bind_text(getBundleByDestination, 1, destinationID.getString().c_str(), destinationID.getString().length(),SQLITE_TRANSIENT);
00685 err = sqlite3_step(getBundleByDestination);
00686
00687 while(err == SQLITE_ROW){
00688 data::Bundle bundle;
00689 stringstream stream_bundleid;
00690 bundleID = (const char*) sqlite3_column_text(getBundleByDestination, 0);
00691 sourceID = (const char*) sqlite3_column_text(getBundleByDestination, 1);
00692 reportto = (const char*) sqlite3_column_text(getBundleByDestination, 3);
00693 custodian = (const char*) sqlite3_column_text(getBundleByDestination, 4);
00694 procflags = sqlite3_column_int(getBundleByDestination, 5);
00695 bundle._timestamp = sqlite3_column_int64(getBundleByDestination, 6);
00696 bundle._sequencenumber = sqlite3_column_int64(getBundleByDestination, 7);
00697 bundle._lifetime = sqlite3_column_int64(getBundleByDestination, 8);
00698
00699 if(procflags & data::Bundle::FRAGMENT){
00700 bundle._fragmentoffset = sqlite3_column_int64(getBundleByDestination,9);
00701 bundle._appdatalength = sqlite3_column_int64(getBundleByDestination,10);
00702 }
00703
00704 bundle._source = data::EID((string)sourceID);
00705 bundle._destination = destinationID;
00706 bundle._reportto = data::EID((string)reportto);
00707 bundle._custodian = data::EID((string)custodian);
00708 bundle._procflags = procflags;
00709
00710
00711 readBlocks(bundle,(string)bundleID);
00712
00713
00714 result.push_back(bundle);
00715 err = sqlite3_step(getBundleByDestination);
00716 }
00717 sqlite3_reset(getBundleByDestination);
00718
00719 if(err != SQLITE_DONE){
00720 stringstream error;
00721 error << "getBundleBySource: error while executing querry: " << err;
00722 std::cerr << error.str();
00723 throw SQLiteQuerryException(error.str());
00724 }
00725 }
00726 return result;
00727 }
00728
00729 list<data::Bundle> SQLiteBundleStorage::getBundleBySize(const int &limit){
00730 list<data::Bundle> result;
00731 const char *bundleID, *sourceID, *reportto, *custodian, *destinationID;
00732 size_t procflags;
00733 int err;
00734 {
00735 ibrcommon::MutexLock lock(dbMutex);
00736 sqlite3_bind_int(getBundlesBySize, 1, limit);
00737 err = sqlite3_step(getBundlesBySize);
00738 while(err == SQLITE_ROW){
00739 data::Bundle bundle;
00740 stringstream stream_bundleid;
00741 bundleID = (const char*) sqlite3_column_text(getBundlesBySize, 0);
00742 sourceID = (const char*) sqlite3_column_text(getBundlesBySize, 1);
00743 destinationID = (const char*) sqlite3_column_text(getBundlesBySize, 2);
00744 reportto = (const char*) sqlite3_column_text(getBundlesBySize, 3);
00745 custodian = (const char*) sqlite3_column_text(getBundlesBySize, 4);
00746 procflags = sqlite3_column_int(getBundlesBySize, 5);
00747 bundle._timestamp = sqlite3_column_int64(getBundlesBySize, 6);
00748 bundle._sequencenumber = sqlite3_column_int64(getBundlesBySize, 7);
00749 bundle._lifetime = sqlite3_column_int64(getBundlesBySize, 8);
00750
00751 if(procflags & data::Bundle::FRAGMENT){
00752 bundle._fragmentoffset = sqlite3_column_int64(getBundlesBySize,9);
00753 bundle._appdatalength = sqlite3_column_int64(getBundlesBySize,10);
00754 }
00755
00756 bundle._source = data::EID((string)sourceID);
00757 bundle._destination = data::EID((string)destinationID);
00758 bundle._reportto = data::EID((string)reportto);
00759 bundle._custodian = data::EID((string)custodian);
00760 bundle._procflags = procflags;
00761
00762
00763 readBlocks(bundle,(string)bundleID);
00764
00765
00766 result.push_back(bundle);
00767 err = sqlite3_step(getBundlesBySize);
00768 }
00769 sqlite3_reset(getBundlesBySize);
00770
00771 if(err != SQLITE_DONE){
00772 stringstream error;
00773 error << "getBundleBySource: error while executing querry: " << err;
00774 std::cerr << error.str();
00775 throw SQLiteQuerryException(error.str());
00776 }
00777 }
00778
00779 return result;
00780 }
00781
00782
00783 std::string SQLiteBundleStorage::getBundleRoutingInfo(const data::BundleID &bundleID, const int &key){
00784 int err;
00785 string result;
00786 {
00787 ibrcommon::MutexLock lock(dbMutex);
00788 sqlite3_bind_text(getBundleRouting,1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00789 sqlite3_bind_int(getBundleRouting,2,key);
00790 err = sqlite3_step(getBundleRouting);
00791 if(err == SQLITE_ROW){
00792 result = (const char*) sqlite3_column_text(getBundleRouting,0);
00793 }
00794 sqlite3_reset(getBundleRouting);
00795 if(err != SQLITE_DONE && err != SQLITE_ROW){
00796 stringstream error;
00797 error << "SQLiteBundleStorage: getBundleRoutingInfo: " << err << " errmsg: " << err;
00798 std::cerr << error.str();
00799 throw SQLiteQuerryException(error.str());
00800 }
00801 }
00802 return result;
00803 }
00804
00805 std::string SQLiteBundleStorage::getNodeRoutingInfo(const data::EID &eid, const int &key){
00806 int err;
00807 string result;
00808 {
00809 ibrcommon::MutexLock lock(dbMutex);
00810 err = sqlite3_bind_text(getNodeRouting,1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT);
00811 err = sqlite3_bind_int(getNodeRouting,2,key);
00812 err = sqlite3_step(getNodeRouting);
00813 if(err == SQLITE_ROW){
00814 result = (const char*) sqlite3_column_text(getNodeRouting,0);
00815 }
00816 sqlite3_reset(getNodeRouting);
00817 if(err != SQLITE_DONE && err != SQLITE_ROW){
00818 stringstream error;
00819 error << "SQLiteBundleStorage: getNodeRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(database);
00820 std::cerr << error.str();
00821 throw SQLiteQuerryException(error.str());
00822 }
00823 }
00824 return result;
00825 }
00826
00827 std::string SQLiteBundleStorage::getRoutingInfo(const int &key){
00828 int err;
00829 string result;
00830 {
00831 ibrcommon::MutexLock lock(dbMutex);
00832 sqlite3_bind_int(getRouting,1,key);
00833 err = sqlite3_step(getRouting);
00834 if(err == SQLITE_ROW){
00835 result = (const char*) sqlite3_column_text(getRouting,0);
00836 }
00837 sqlite3_reset(getRouting);
00838 if(err != SQLITE_DONE && err != SQLITE_ROW){
00839 stringstream error;
00840 error << "SQLiteBundleStorage: getRoutingInfo: " << err << " errmsg: " << sqlite3_errmsg(database);
00841 std::cerr << error.str();
00842 throw SQLiteQuerryException(error.str());
00843 }
00844 }
00845 return result;
00846 }
00847
00848 void SQLiteBundleStorage::setPriority(const int priority,const dtn::data::BundleID &id){
00849 int err;
00850 size_t procflags;
00851 {
00852 ibrcommon::MutexLock lock(dbMutex);
00853 sqlite3_bind_text(getProcFlags,1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT);
00854 err = sqlite3_step(getProcFlags);
00855 if(err == SQLITE_ROW)
00856 procflags = sqlite3_column_int(getProcFlags,0);
00857 sqlite3_reset(getProcFlags);
00858 if(err != SQLITE_DONE){
00859 stringstream error;
00860 error << "SQLiteBundleStorage: error while Select Querry: " << err;
00861 std::cerr << error.str();
00862 throw SQLiteQuerryException(error.str());
00863 }
00864
00865
00866 procflags -= ( 128*(procflags & dtn::data::Bundle::PRIORITY_BIT1) + 256*(procflags & dtn::data::Bundle::PRIORITY_BIT2));
00867
00868
00869 switch(priority){
00870 case 1: procflags += data::Bundle::PRIORITY_BIT1; break;
00871 case 2: procflags += data::Bundle::PRIORITY_BIT2; break;
00872 case 3: procflags += (data::Bundle::PRIORITY_BIT1 + data::Bundle::PRIORITY_BIT2); break;
00873 }
00874
00875 sqlite3_bind_text(updateProcFlags,1,id.toString().c_str(), id.toString().length(), SQLITE_TRANSIENT);
00876 sqlite3_bind_int64(updateProcFlags,2,priority);
00877 sqlite3_step(updateProcFlags);
00878 sqlite3_reset(updateProcFlags);
00879 if(err != SQLITE_DONE){
00880 stringstream error;
00881 error << "SQLiteBundleStorage: setPriority error while Select Querry: " << err;
00882 std::cerr << error.str();
00883 throw SQLiteQuerryException(error.str());
00884 }
00885 }
00886 }
00887
00888 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle){
00889 bool local = bundle._destination.getNodeEID() == BundleCore::local.getNodeEID();
00890 if(bundle._procflags & dtn::data::Bundle::FRAGMENT && local){
00891
00892 storeFragment(bundle);
00893 return;
00894 }
00895 int err;
00896 size_t TTL;
00897 stringstream stream_bundleid, completefilename;
00898 string bundlestring, destination;
00899
00900 destination = bundle._destination.getString();
00901 stream_bundleid << "[" << bundle._timestamp << "." << bundle._sequencenumber;
00902 if (bundle._procflags & dtn::data::Bundle::FRAGMENT)
00903 {
00904 stream_bundleid << "." << bundle._fragmentoffset;
00905 }
00906 stream_bundleid << "] " << bundle._source.getString();
00907 TTL = bundle._timestamp + bundle._lifetime;
00908 {
00909 ibrcommon::MutexLock lock(dbMutex);
00910
00911 int size = storeBlocks(bundle);
00912
00913 sqlite3_bind_text(store_Bundle, 1, stream_bundleid.str().c_str(), stream_bundleid.str().length(),SQLITE_TRANSIENT);
00914 sqlite3_bind_text(store_Bundle, 2,bundle._source.getString().c_str(), bundle._source.getString().length(),SQLITE_TRANSIENT);
00915 sqlite3_bind_text(store_Bundle, 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
00916 sqlite3_bind_text(store_Bundle, 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT);
00917 sqlite3_bind_text(store_Bundle, 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT);
00918 sqlite3_bind_int(store_Bundle,6,bundle._procflags);
00919 sqlite3_bind_int64(store_Bundle,7,bundle._timestamp);
00920 sqlite3_bind_int(store_Bundle,8,bundle._sequencenumber);
00921 sqlite3_bind_int64(store_Bundle,9,bundle._lifetime);
00922 sqlite3_bind_int64(store_Bundle,10,0);
00923 sqlite3_bind_int64(store_Bundle,11,0);
00924 sqlite3_bind_int64(store_Bundle,12,TTL);
00925 sqlite3_bind_int(store_Bundle,13,bundle._appdatalength);
00926 err = sqlite3_step(store_Bundle);
00927 if(err == SQLITE_CONSTRAINT)
00928 cerr << "warning: Bundle is already in the storage"<<endl;
00929 else if(err != SQLITE_DONE){
00930 stringstream error;
00931 error << "SQLiteBundleStorage: store() failure: "<< err << " " << sqlite3_errmsg(database);
00932 std::cerr << error.str();
00933 throw SQLiteQuerryException(error.str());
00934 }
00935 sqlite3_reset(store_Bundle);
00936 }
00937 {
00938 ibrcommon::MutexLock lock = ibrcommon::MutexLock(time_change);
00939 if(nextExpiredTime == 0 || TTL < nextExpiredTime){
00940 nextExpiredTime = TTL;
00941 }
00942 }
00943 }
00944
00945 void SQLiteBundleStorage::storeBundleRoutingInfo(const data::BundleID &bundleID, const int &key, const string &routingInfo){
00946 int err;
00947 {
00948 ibrcommon::MutexLock lock(dbMutex);
00949 sqlite3_bind_text(storeBundleRouting,1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
00950 sqlite3_bind_int(storeBundleRouting,2,key);
00951 sqlite3_bind_text(storeBundleRouting,3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00952 err = sqlite3_step(storeBundleRouting);
00953 sqlite3_reset(storeBundleRouting);
00954 if(err == SQLITE_CONSTRAINT)
00955 cerr << "warning: BundleRoutingInformations for "<<bundleID.toString() <<" are either already in the storage or there is no Bundle with this BundleID."<<endl;
00956 else if(err != SQLITE_DONE){
00957 stringstream error;
00958 error << "SQLiteBundleStorage: storeBundleRoutingInfo: " << err << " errmsg: " << err;
00959 std::cerr << error.str();
00960 throw SQLiteQuerryException(error.str());
00961 }
00962 }
00963 }
00964
00965 void SQLiteBundleStorage::storeNodeRoutingInfo(const data::EID &node, const int &key, const std::string &routingInfo){
00966 int err;
00967 {
00968 ibrcommon::MutexLock lock(dbMutex);
00969 sqlite3_bind_text(storeNodeRouting,1,node.getString().c_str(),node.getString().length(), SQLITE_TRANSIENT);
00970 sqlite3_bind_int(storeNodeRouting,2,key);
00971 sqlite3_bind_text(storeNodeRouting,3,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00972 err = sqlite3_step(storeNodeRouting);
00973 sqlite3_reset(storeNodeRouting);
00974 if(err == SQLITE_CONSTRAINT)
00975 cerr << "warning: NodeRoutingInfo for "<<node.getString() <<" are already in the storage."<<endl;
00976 else if(err != SQLITE_DONE){
00977 stringstream error;
00978 error << "SQLiteBundleStorage: storeNodeRoutingInfo: " << err << " errmsg: " << err;
00979 std::cerr << error.str();
00980 throw SQLiteQuerryException(error.str());
00981 }
00982 }
00983 }
00984
00985 void SQLiteBundleStorage::storeRoutingInfo(const int &key, const std::string &routingInfo){
00986 int err;
00987 {
00988 ibrcommon::MutexLock lock(dbMutex);
00989 sqlite3_bind_int(storeRouting,1,key);
00990 sqlite3_bind_text(storeRouting,2,routingInfo.c_str(),routingInfo.length(), SQLITE_TRANSIENT);
00991 err = sqlite3_step(storeRouting);
00992 sqlite3_reset(storeRouting);
00993 if(err == SQLITE_CONSTRAINT)
00994 cerr << "warning: There are already RoutingInformation refereed by this Key"<<endl;
00995 else if(err != SQLITE_DONE){
00996 stringstream error;
00997 error << "SQLiteBundleStorage: storeRoutingInfo: " << err << " errmsg: " << err;
00998 std::cerr << error.str();
00999 throw SQLiteQuerryException(error.str());
01000 }
01001 }
01002 }
01003
01004 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id){
01005 int err;
01006 string filename;
01007 list<const char*> blocklist;
01008 list<const char*>::iterator it;
01009
01010 {
01011 ibrcommon::MutexLock lock(dbMutex);
01012 sqlite3_bind_text(getBlocksByID, 1, id.toString().c_str(), id.toString().length(),SQLITE_TRANSIENT);
01013 err = sqlite3_step(getBlocksByID);
01014 if(err == SQLITE_DONE)
01015 return;
01016 while (err == SQLITE_ROW){
01017 filename = (const char*)sqlite3_column_text(getBlocksByID,0);
01018 deleteList.push_front(filename);
01019 err = sqlite3_step(getBlocksByID);
01020 }
01021 sqlite3_reset(getBlocksByID);
01022 if(err != SQLITE_DONE){
01023 stringstream error;
01024 error << "SQLiteBundleStorage: failure while getting filename: " << id.toString() << " errmsg: " << err;
01025 std::cerr << error.str();
01026 throw SQLiteQuerryException(error.str());
01027 }
01028
01029 sqlite3_bind_text(removeBundle, 1, id.toString().c_str(), id.toString().length(),SQLITE_TRANSIENT);
01030 err = sqlite3_step(removeBundle);
01031 sqlite3_reset(removeBundle);
01032 if(err != SQLITE_DONE){
01033 stringstream error;
01034 error << "SQLiteBundleStorage: failure while removing: " << id.toString() << " errmsg: " << err;
01035 std::cerr << error.str();
01036 throw SQLiteQuerryException(error.str());
01037 }
01038
01039
01040
01041
01042
01043
01044
01045
01046
01047
01048
01049 updateexpiredTime();
01050 }
01051 }
01052
01053 void SQLiteBundleStorage::removeBundleRoutingInfo(const data::BundleID &bundleID, const int &key){
01054 int err;
01055 ibrcommon::MutexLock lock(dbMutex);
01056 sqlite3_bind_text(removeBundleRouting,1,bundleID.toString().c_str(),bundleID.toString().length(), SQLITE_TRANSIENT);
01057 sqlite3_bind_int(removeBundleRouting,2,key);
01058 err = sqlite3_step(removeBundleRouting);
01059 sqlite3_reset(removeBundleRouting);
01060 if(err != SQLITE_DONE){
01061 stringstream error;
01062 error << "SQLiteBundleStorage: removeBundleRoutingInfo: " << err << " errmsg: " << err;
01063 std::cerr << error.str();
01064 throw SQLiteQuerryException(error.str());
01065 }
01066 }
01067
01068 void SQLiteBundleStorage::removeNodeRoutingInfo(const data::EID &eid, const int &key){
01069 int err;
01070 ibrcommon::MutexLock lock(dbMutex);
01071 sqlite3_bind_text(removeNodeRouting,1,eid.getString().c_str(),eid.getString().length(), SQLITE_TRANSIENT);
01072 sqlite3_bind_int(removeNodeRouting,2,key);
01073 err = sqlite3_step(removeNodeRouting);
01074 sqlite3_reset(removeNodeRouting);
01075 if(err != SQLITE_DONE){
01076 stringstream error;
01077 error << "SQLiteBundleStorage: removeNodeRoutingInfo: " << err << " errmsg: " << err;
01078 std::cerr << error.str();
01079 throw SQLiteQuerryException(error.str());
01080 }
01081 }
01082
01083 void SQLiteBundleStorage::removeRoutingInfo(const int &key){
01084 int err;
01085 ibrcommon::MutexLock lock(dbMutex);
01086 sqlite3_bind_int(removeRouting,1,key);
01087 err = sqlite3_step(removeRouting);
01088 sqlite3_reset(removeRouting);
01089 if(err != SQLITE_DONE){
01090 stringstream error;
01091 error << "SQLiteBundleStorage: removeRoutingInfo: " << err << " errmsg: " << err;
01092 std::cerr << error.str();
01093 throw SQLiteQuerryException(error.str());
01094 }
01095 }
01096
01097 void SQLiteBundleStorage::clearAll(){
01098 ibrcommon::MutexLock lock(dbMutex);
01099 sqlite3_step(clearBundles);
01100 sqlite3_reset(clearBundles);
01101 sqlite3_step(clearFragments);
01102 sqlite3_reset(clearFragments);
01103 sqlite3_step(clearRouting);
01104 sqlite3_reset(clearRouting);
01105 sqlite3_step(clearNodeRouting);
01106 sqlite3_reset(clearNodeRouting);
01107 sqlite3_step(clearBlocks);
01108 sqlite3_reset(clearBlocks);
01109 if(SQLITE_DONE != sqlite3_step(vacuum)){
01110 sqlite3_reset(vacuum);
01111 throw SQLiteQuerryException("SQLiteBundleStore: clear(): vacuum failed.");
01112 }
01113 sqlite3_reset(vacuum);
01114
01115 {
01116 stringstream rm,mkdir;
01117 rm << "rm -r " << dbPath << "/" << SQL_TABLE_BLOCK;
01118 mkdir << "mkdir "<< dbPath << "/" << SQL_TABLE_BLOCK;
01119 system(rm.str().c_str());
01120 system(mkdir.str().c_str());
01121 }
01122
01123
01124 {
01125 stringstream rm,mkdir;
01126 rm << "rm -r " << dbPath << "/" << SQL_TABLE_FRAGMENT;
01127 mkdir << "mkdir "<< dbPath << "/" << SQL_TABLE_FRAGMENT;
01128 system(rm.str().c_str());
01129 system(mkdir.str().c_str());
01130 }
01131 nextExpiredTime = 0;
01132 }
01133
01134 void SQLiteBundleStorage::clear(){
01135 ibrcommon::MutexLock lock(dbMutex);
01136 sqlite3_step(clearBundles);
01137 sqlite3_reset(clearBundles);
01138 sqlite3_step(clearFragments);
01139 sqlite3_reset(clearFragments);
01140 sqlite3_step(clearBlocks);
01141 sqlite3_reset(clearBlocks);
01142 if(SQLITE_DONE != sqlite3_step(vacuum)){
01143 sqlite3_reset(vacuum);
01144 throw SQLiteQuerryException("SQLiteBundleStore: clear(): vacuum failed.");
01145 }
01146 sqlite3_reset(vacuum);
01147
01148 {
01149 stringstream rm,mkdir;
01150 rm << "rm -r " << dbPath << "/" << SQL_TABLE_BLOCK;
01151 mkdir << "mkdir "<< dbPath << "/" << SQL_TABLE_BLOCK;
01152 system(rm.str().c_str());
01153 system(mkdir.str().c_str());
01154 }
01155
01156
01157 {
01158 stringstream rm,mkdir;
01159 rm << "rm -r " << dbPath << "/" << SQL_TABLE_FRAGMENT;
01160 mkdir << "mkdir "<< dbPath << "/" << SQL_TABLE_FRAGMENT;
01161 system(rm.str().c_str());
01162 system(mkdir.str().c_str());
01163 }
01164 nextExpiredTime = 0;
01165 }
01166
01167
01168
01169 bool SQLiteBundleStorage::empty(){
01170 ibrcommon::MutexLock lock(dbMutex);
01171 if (SQLITE_DONE == sqlite3_step(getROWID)){
01172 sqlite3_reset(getROWID);
01173 return true;
01174 }
01175 else{
01176 sqlite3_reset(getROWID);
01177 return false;
01178 }
01179 }
01180
01181 unsigned int SQLiteBundleStorage::count(){
01182 ibrcommon::MutexLock lock(dbMutex);
01183 int rows = 0,err;
01184 err = sqlite3_step(countEntries);
01185 if(err == SQLITE_ROW){
01186 rows = sqlite3_column_int(countEntries,0);
01187 }
01188 sqlite3_reset(countEntries);
01189 if (err != SQLITE_ROW){
01190 stringstream error;
01191 error << "SQLiteBundleStorage: count: failure " << err << " " << sqlite3_errmsg(database);
01192 std::cerr << error.str();
01193 throw SQLiteQuerryException(error.str());
01194 }
01195 return rows;
01196 }
01197
01198 int SQLiteBundleStorage::occupiedSpace(){
01199 int size(0);
01200 DIR *dir;
01201 struct dirent *directory;
01202 struct stat aktFile;
01203 dir = opendir((dbPath+"/"+_tables[SQL_TABLE_BLOCK]).c_str());
01204 if(dir == NULL){
01205 cerr << "occupiedSpace: unable to open Directory " << dbPath+"/"+ _tables[SQL_TABLE_BLOCK];
01206 return -1;
01207 }
01208 directory = readdir(dir);
01209 if(dir == NULL){
01210 cerr << "occupiedSpace: unable to read Directory " << dbPath+"/"+ _tables[SQL_TABLE_BLOCK];
01211 return -1;
01212 }
01213 while (directory != 0){
01214 stringstream filename;
01215 if(strcmp(directory->d_name,".") && strcmp(directory->d_name, "..")){
01216 filename << dbPath << "/"<<SQL_TABLE_BLOCK<<"/"<< directory->d_name;
01217 stat(filename.str().c_str(),&aktFile);
01218 size += aktFile.st_size;
01219 }
01220 directory = readdir(dir);
01221 }
01222 closedir(dir);
01223 dir = opendir((dbPath+"/"+_tables[SQL_TABLE_FRAGMENT]).c_str());
01224 if(dir == NULL){
01225 cerr << "occupiedSpace: unable to open Directory " << dbPath+"/"+ _tables[SQL_TABLE_BLOCK];
01226 return -1;
01227 }
01228 while (directory != 0){
01229 stringstream filename;
01230 if(strcmp(directory->d_name,".") && strcmp(directory->d_name, "..")){
01231 filename << dbPath << "/"<<SQL_TABLE_FRAGMENT<<"/"<< directory->d_name;
01232 stat(filename.str().c_str(),&aktFile);
01233 size += aktFile.st_size;
01234 }
01235 directory = readdir(dir);
01236 }
01237
01238 stat((dbPath+'/'+dbFile).c_str(),&aktFile);
01239 size += aktFile.st_size;
01240
01241 closedir(dir);
01242 return size;
01243 }
01244
01245 void SQLiteBundleStorage::storeFragment(const dtn::data::Bundle &bundle){
01246 int err, filedescriptor = -1;
01247 size_t bitcounter = 0;
01248 bool allFragmentsReceived(true);
01249 size_t payloadsize, TTL, fragmentoffset;
01250 fstream datei;
01251 list<const dtn::data::Block*> blocklist;
01252 list<const dtn::data::Block*>:: const_iterator it;
01253 stringstream path,path2;
01254 string sourceEID, destination, payloadfilename, filename;
01255 char *tmp = strdup(path.str().c_str());
01256 std::string bundleID = dtn::data::BundleID(bundle).toString();
01257
01258 destination = bundle._destination.getString();
01259 sourceEID = bundle._source.getString();
01260 TTL = bundle._timestamp + bundle._lifetime;
01261 path << dbPath << "/" << SQL_TABLE_FRAGMENT << "/fragXXXXXX";
01262 path2 << dbPath << "/" << SQL_TABLE_BLOCK << "/blockXXXXXX";
01263
01264 const dtn::data::PayloadBlock &block = bundle.getBlock<dtn::data::PayloadBlock>();
01265 ibrcommon::BLOB::Reference payloadBlob = block.getBLOB();
01266 {
01267 ibrcommon::MutexLock reflock(payloadBlob);
01268 payloadsize = payloadBlob.getSize();
01269 }
01270 {
01271 ibrcommon::MutexLock lock(dbMutex);
01272
01273 bool last = bundle._fragmentoffset + payloadsize == bundle._appdatalength;
01274 bool first = bundle._fragmentoffset == 0;
01275 if(first || last){
01276 int i,blocknumber;
01277 blocklist = bundle.getBlocks();
01278
01279 for(it = blocklist.begin(), i = 0; it != blocklist.end(); it++,i++){
01280 char *temp = strdup(path2.str().c_str());
01281 filedescriptor = mkstemp(temp);
01282 if (filedescriptor == -1){
01283 free(temp);
01284 throw ibrcommon::IOException("Could not create file.");
01285 }
01286 close(filedescriptor);
01287
01288 datei.open(temp,ios::out|ios::binary);
01289 dtn::data::SeparateSerializer serializer(datei);
01290 serializer << (*(*it));
01291 datei.close();
01292 filename=temp;
01293 free(temp);
01294
01295
01296
01297
01298
01299
01300
01301 sqlite3_bind_text(storeBlock,1,bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01302 sqlite3_bind_int(storeBlock,2,(int)((*it)->getType()));
01303 sqlite3_bind_text(storeBlock,3,filename.c_str(), filename.length(),SQLITE_TRANSIENT);
01304
01305 blocknumber = blocklist.size() - i;
01306 if(first) {blocknumber = (blocklist.size() - i)*(-1);}
01307
01308 sqlite3_bind_int(storeBlock,4,blocknumber);
01309 err = sqlite3_step(storeBlock);
01310 sqlite3_reset(storeBlock);
01311 if(err != SQLITE_DONE){
01312 stringstream error;
01313 error << "SQLiteBundleStorage: storeBlocks() failure: "<< err << " " << sqlite3_errmsg(database);
01314 std::cerr << error.str();
01315 throw SQLiteQuerryException(error.str());
01316 }
01317 }
01318 }
01319
01320
01321
01322 sqlite3_bind_text(getFragements,1,sourceEID.c_str(),sourceEID.length(),SQLITE_TRANSIENT);
01323 sqlite3_bind_int64(getFragements,2,bundle._timestamp);
01324 sqlite3_bind_int(getFragements,3,bundle._sequencenumber);
01325 err = sqlite3_step(getFragements);
01326 bitcounter = 0;
01327
01328
01329 if(err == SQLITE_DONE){
01330 filedescriptor = mkstemp(tmp);
01331 if (filedescriptor == -1){
01332 free(tmp);
01333 throw ibrcommon::IOException("Could not create file.");
01334 }
01335 close(filedescriptor);
01336 payloadfilename = tmp;
01337 }
01338
01339 else if (err == SQLITE_ROW){
01340 payloadfilename = (const char*)sqlite3_column_text(getFragements,10);
01341 }
01342 free(tmp);
01343
01344 while (err == SQLITE_ROW){
01345
01346
01347 fragmentoffset = sqlite3_column_int(getFragements,7);
01348
01349 if(bitcounter >= fragmentoffset){
01350 bitcounter += fragmentoffset - bitcounter + sqlite3_column_int(getFragements,8);
01351 }
01352 else if(bitcounter >= bundle._fragmentoffset){
01353 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize;
01354 }
01355 else{
01356 allFragmentsReceived = false;
01357 }
01358 err = sqlite3_step(getFragements);
01359 }
01360 if(bitcounter +1 >= bundle._fragmentoffset && allFragmentsReceived)
01361 bitcounter += bundle._fragmentoffset - bitcounter + payloadsize;
01362
01363 sqlite3_reset(getFragements);
01364 if(err != SQLITE_DONE){
01365 stringstream error;
01366 error << "SQLiteBundleStorage: storeFragment: " << err << " errmsg: " << sqlite3_errmsg(database);
01367 std::cerr << error.str();
01368 throw SQLiteQuerryException(error.str());
01369 }
01370
01371 datei.open(payloadfilename.c_str(),ios::in|ios::out|ios::binary);
01372 datei.seekp(bundle._fragmentoffset);
01373 {
01374 ibrcommon::MutexLock reflock(payloadBlob);
01375
01376 size_t ret = 0;
01377 istream &s = (*payloadBlob);
01378 s.seekg(0);
01379
01380 while (true)
01381 {
01382 char buf;
01383 s.get(buf);
01384 if(!s.eof()){
01385 datei.put(buf);
01386 ret++;
01387 }
01388 else
01389 break;
01390 }
01391 datei.close();
01392 }
01393
01394
01395 if(bundle._appdatalength == bitcounter){
01396
01397
01398
01399
01400
01401
01402
01403 ibrcommon::File file(payloadfilename);
01404 ibrcommon::BLOB::Reference ref(ibrcommon::FileBLOB::create(file));
01405 dtn::data::PayloadBlock pb(ref);
01406
01407 if(pb.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS)){
01408 std::list<dtn::data::EID> eidlist;
01409 std::list<dtn::data::EID>::iterator eidIt;
01410 eidlist = block.getEIDList();
01411 for(eidIt = eidlist.begin(); eidIt != eidlist.end(); eidIt++){
01412 pb.addEID(*eidIt);
01413 }
01414 }
01415
01416 pb.set(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT, block.get(dtn::data::PayloadBlock::REPLICATE_IN_EVERY_FRAGMENT));
01417 pb.set(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::TRANSMIT_STATUSREPORT_IF_NOT_PROCESSED));
01418 pb.set(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DELETE_BUNDLE_IF_NOT_PROCESSED));
01419 pb.set(dtn::data::PayloadBlock::LAST_BLOCK, block.get(dtn::data::PayloadBlock::LAST_BLOCK));
01420 pb.set(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED, block.get(dtn::data::PayloadBlock::DISCARD_IF_NOT_PROCESSED));
01421 pb.set(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED, block.get(dtn::data::PayloadBlock::FORWARDED_WITHOUT_PROCESSED));
01422 pb.set(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS, block.get(dtn::data::PayloadBlock::BLOCK_CONTAINS_EIDS));
01423
01424
01425 block.getLength();
01426
01427
01428 char *temp = strdup(path2.str().c_str());
01429 filedescriptor = mkstemp(temp);
01430 if (filedescriptor == -1){
01431 free(temp);
01432 throw ibrcommon::IOException("Could not create file.");
01433 }
01434 close(filedescriptor);
01435
01436 datei.open(temp,ios::out|ios::binary);
01437 dtn::data::SeparateSerializer serializer(datei);
01438 serializer << (pb);
01439 datei.close();
01440 filename=temp;
01441 free(temp);
01442
01443 sqlite3_bind_text(storeBlock,1,bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01444 sqlite3_bind_int(storeBlock,2,(int)((*it)->getType()));
01445 sqlite3_bind_text(storeBlock,3,filename.c_str(), filename.length(),SQLITE_TRANSIENT);
01446 sqlite3_bind_int(storeBlock,4,0);
01447 err = sqlite3_step(storeBlock);
01448 sqlite3_reset(storeBlock);
01449 if(err != SQLITE_DONE){
01450 stringstream error;
01451 error << "SQLiteBundleStorage: storeBlocks() failure: "<< err << " " << sqlite3_errmsg(database);
01452 std::cerr << error.str();
01453 throw SQLiteQuerryException(error.str());
01454 }
01455
01456
01457 size_t procFlags = bundle._procflags;
01458 procFlags &= ~(dtn::data::PrimaryBlock::FRAGMENT);
01459
01460 sqlite3_bind_text(store_Bundle, 1, bundleID.c_str(), bundleID.length(),SQLITE_TRANSIENT);
01461 sqlite3_bind_text(store_Bundle, 2,sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01462 sqlite3_bind_text(store_Bundle, 3,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
01463 sqlite3_bind_text(store_Bundle, 4,bundle._reportto.getString().c_str(), bundle._reportto.getString().length(),SQLITE_TRANSIENT);
01464 sqlite3_bind_text(store_Bundle, 5,bundle._custodian.getString().c_str(), bundle._custodian.getString().length(),SQLITE_TRANSIENT);
01465 sqlite3_bind_int(store_Bundle,6,procFlags);
01466 sqlite3_bind_int64(store_Bundle,7,bundle._timestamp);
01467 sqlite3_bind_int(store_Bundle,8,bundle._sequencenumber);
01468 sqlite3_bind_int64(store_Bundle,9,bundle._lifetime);
01469 sqlite3_bind_int64(store_Bundle,10,bundle._fragmentoffset);
01470 sqlite3_bind_int64(store_Bundle,11,bundle._appdatalength);
01471 sqlite3_bind_int64(store_Bundle,12,TTL);
01472 sqlite3_bind_int(store_Bundle,13,size);
01473 err = sqlite3_step(store_Bundle);
01474 if(err == SQLITE_CONSTRAINT)
01475 cerr << "warning: Bundle is already in the storage"<<endl;
01476 else if(err != SQLITE_DONE){
01477 stringstream error;
01478 error << "SQLiteBundleStorage: store() failure: "<< err << " " << sqlite3_errmsg(database);
01479 std::cerr << error.str();
01480 throw SQLiteQuerryException(error.str());
01481 }
01482 sqlite3_reset(store_Bundle);
01483 }
01484
01485
01486 else{
01487 sqlite3_bind_text(store_Fragment, 1, sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
01488 sqlite3_bind_int64(store_Fragment,2,bundle._timestamp);
01489 sqlite3_bind_int(store_Fragment,3,bundle._sequencenumber);
01490 sqlite3_bind_text(store_Fragment, 4,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
01491 sqlite3_bind_int64(store_Fragment,5,TTL);
01492 sqlite3_bind_int(store_Fragment,6,priority);
01493 sqlite3_bind_int64(store_Fragment,7,bundle._fragmentoffset);
01494 sqlite3_bind_int64(store_Fragment,8,payloadsize);
01495 sqlite3_bind_text(store_Fragment,9,payloadfilename.c_str() ,payloadfilename.length(),SQLITE_TRANSIENT);
01496 err = sqlite3_step(store_Fragment);
01497 sqlite3_reset(store_Fragment);
01498 if(err != SQLITE_DONE){
01499 stringstream error;
01500 error << "SQLiteBundleStorage: storeFragment() failure: "<< err << " " << sqlite3_errmsg(database);
01501 std::cerr << error.str();
01502 throw SQLiteQuerryException(error.str());
01503 }
01504 }
01505 }
01506 }
01507
01508 void SQLiteBundleStorage::raiseEvent(const Event *evt)
01509 {
01510 try {
01511 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt);
01512
01513 ibrcommon::MutexLock lock = ibrcommon::MutexLock(time_change);
01514 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
01515 {
01516 actual_time = time.getTimestamp();
01517
01518
01519
01520 if(actual_time >= nextExpiredTime){
01521 deleteexpired();
01522 }
01523 }
01524 } catch (const std::bad_cast&) { }
01525
01526 try {
01527 const GlobalEvent &global = dynamic_cast<const GlobalEvent&>(*evt);
01528
01529 if (global.getAction() == dtn::core::GlobalEvent::GLOBAL_SHUTDOWN)
01530 {
01531 {
01532 ibrcommon::MutexLock lock(dbMutex);
01533 global_shutdown = true;
01534 }
01535 timeeventConditional.signal();
01536 }
01537 else if(global.getAction() == GlobalEvent::GLOBAL_IDLE){
01538 idle = true;
01539 timeeventConditional.signal();
01540 }
01541 else if(global.getAction() == GlobalEvent::GLOBAL_BUSY) {
01542 idle = false;
01543 }
01544 } catch (const std::bad_cast&) { }
01545 }
01546
01547 void SQLiteBundleStorage::run(void)
01548 {
01549 ibrcommon::MutexLock l(timeeventConditional);
01550
01551 while (!global_shutdown)
01552 {
01553 idle_work();
01554 timeeventConditional.wait();
01555 }
01556 }
01557
01558 bool SQLiteBundleStorage::__cancellation()
01559 {
01560 ibrcommon::MutexLock l(timeeventConditional);
01561 global_shutdown = true;
01562 timeeventConditional.signal(true);
01563
01564
01565 return true;
01566 }
01567
01568 void SQLiteBundleStorage::deleteexpired(){
01569
01570
01571
01572
01573
01574 int err;
01575 string file_Name, source, payload_name;
01576 size_t sequence, offset, timestamp;
01577 data::BundleID ID;
01578 data::EID sourceID;
01579 list<data::BundleID> eventlist;
01580 list<data::BundleID>::iterator it;
01581 {
01582
01583 ibrcommon::MutexLock lock(dbMutex);
01584 sqlite3_bind_int64(getBundleTTL,1, actual_time);
01585 err = sqlite3_step(getBundleTTL);
01586 while (err == SQLITE_ROW){
01587 source = (const char*)sqlite3_column_text(getBundleTTL,0);
01588 timestamp = sqlite3_column_int64(getBundleTTL,1);
01589 sequence = sqlite3_column_int64(getBundleTTL,2);
01590 sourceID = data::EID(source);
01591 ID = data::BundleID(sourceID, timestamp, sequence);
01592 eventlist.push_back(ID);
01593 err = sqlite3_step(getBundleTTL);
01594 }
01595 sqlite3_reset(getBundleTTL);
01596 if(err != SQLITE_DONE){
01597 stringstream error;
01598 error << "SQLiteBundleStorage: deleteexpired() failure: "<< err << " " << sqlite3_errmsg(database);
01599 std::cerr << error.str();
01600 throw SQLiteQuerryException(error.str());
01601 }
01602
01603
01604 for(it = eventlist.begin(); it != eventlist.end(); it++){
01605 sqlite3_bind_text(getBlocksByID,1,(*it).toString().c_str(),(*it).toString().length(),SQLITE_TRANSIENT);
01606 err = sqlite3_step(getBlocksByID);
01607
01608 while(err == SQLITE_ROW){
01609 file_Name = (const char*)sqlite3_column_text(getBlocksByID,0);
01610 deleteList.push_back(file_Name);
01611 err = sqlite3_step(getBlocksByID);
01612 }
01613 sqlite3_reset(getBlocksByID);
01614
01615
01616 if(err != SQLITE_DONE){
01617 stringstream error;
01618 error << "SQLiteBundleStorage deleteexpired() failure: while getting Blocks " << err << " "<< sqlite3_errmsg(database);
01619 std::cerr << error.str();
01620 throw SQLiteQuerryException(error.str());
01621 }
01622 }
01623
01624
01625 sqlite3_bind_int64(getFragmentTTL,1, actual_time);
01626 err = sqlite3_step(getFragmentTTL);
01627 while (err == SQLITE_ROW ){
01628 source = (const char*)sqlite3_column_text(getFragmentTTL,0);
01629 timestamp = sqlite3_column_int64(getFragmentTTL,1);
01630 sequence = sqlite3_column_int64(getFragmentTTL,2);
01631 offset = sqlite3_column_int64(getFragmentTTL,3);
01632 file_Name = (const char*)sqlite3_column_text(getFragmentTTL,4);
01633 payload_name = (const char*)sqlite3_column_text(getFragmentTTL,5);
01634
01635 sourceID = data::EID((string)source);
01636 ID = data::BundleID(sourceID, timestamp, sequence, true, offset);
01637 eventlist.push_back(ID);
01638 deleteList.push_back(file_Name);
01639 deleteList.push_back(payload_name);
01640 err = sqlite3_step(getFragmentTTL);
01641 }
01642 sqlite3_reset(getFragmentTTL);
01643 if(err != SQLITE_DONE){
01644 stringstream error;
01645 error << "SQLiteBundleStorage deleteexpired() failure: while getting Fragments " << err << " "<< sqlite3_errmsg(database);
01646 std::cerr << error.str();
01647 throw SQLiteQuerryException(error.str());
01648 }
01649
01650
01651 sqlite3_bind_int64(deleteFragementTTL,1, actual_time);
01652 err = sqlite3_step(deleteFragementTTL);
01653 sqlite3_reset(deleteFragementTTL);
01654 if(err != SQLITE_DONE){
01655 stringstream error;
01656 error << "SQLiteBundleStorage deleteexpired() failure: while deleting Fragments " << err << " "<< sqlite3_errmsg(database);
01657 std::cerr << error.str();
01658 throw SQLiteQuerryException(error.str());
01659 }
01660
01661
01662 sqlite3_bind_int64(deleteBundleTTL,1, actual_time);
01663 err = sqlite3_step(deleteBundleTTL);
01664 sqlite3_reset(deleteBundleTTL);
01665 if(err != SQLITE_DONE){
01666 stringstream error;
01667 error << "SQLiteBundleStorage deleteexpired() failure: while deleting Bundles " << err << " "<< sqlite3_errmsg(database);
01668 std::cerr << error.str();
01669 throw SQLiteQuerryException(error.str());
01670 }
01671 }
01672
01673
01674 updateexpiredTime();
01675
01676
01677 for(it = eventlist.begin(); it != eventlist.end(); it++){
01678 BundleExpiredEvent::raise((*it));
01679 }
01680 }
01681
01682 void SQLiteBundleStorage::updateexpiredTime(){
01683 int err;
01684 size_t time;
01685 err = sqlite3_step(getnextExpiredBundle);
01686 if (err == SQLITE_ROW)
01687 time = sqlite3_column_int64(getnextExpiredBundle,0);
01688 sqlite3_reset(getnextExpiredBundle);
01689
01690 err = sqlite3_step(getnextExpiredFragment);
01691 if (err == SQLITE_ROW){
01692 if(time > sqlite3_column_int64(getnextExpiredFragment,0))
01693 time = sqlite3_column_int64(getnextExpiredFragment,0);
01694 }
01695 sqlite3_reset(getnextExpiredFragment);
01696 nextExpiredTime = time;
01697 }
01698
01699 int SQLiteBundleStorage::storeBlocks(const data::Bundle &bundle){
01700 const list<const data::Block*> blocklist(bundle.getBlocks());
01701 list<const data::Block*>::const_iterator it;
01702 int filedescriptor(-1), blocktyp, err, blocknumber(1), storedBytes(0);
01703 fstream filestream;
01704 stringstream path;
01705 path << dbPath << "/" << SQL_TABLE_BLOCK << "/blockXXXXXX";
01706 data::BundleID ID(bundle);
01707
01708 for(it = blocklist.begin() ;it != blocklist.end(); it++){
01709 filedescriptor = -1;
01710 blocktyp = (int)(*it)->getType();
01711 char *blockfilename;
01712
01713
01714 blockfilename = strdup(path.str().c_str());
01715 filedescriptor = mkstemp(blockfilename);
01716 if (filedescriptor == -1)
01717 throw ibrcommon::IOException("Could not create file.");
01718 close(filedescriptor);
01719
01720 filestream.open(blockfilename,ios_base::out|ios::binary);
01721 dtn::data::SeparateSerializer serializer(filestream);
01722 serializer << (*(*it));
01723 filestream.close();
01724
01725
01726 sqlite3_bind_text(storeBlock,1,ID.toString().c_str(), ID.toString().length(),SQLITE_TRANSIENT);
01727 sqlite3_bind_int(storeBlock,2,blocktyp);
01728 sqlite3_bind_text(storeBlock,3,blockfilename, strlen(blockfilename),SQLITE_TRANSIENT);
01729 sqlite3_bind_int(storeBlock,4,blocknumber);
01730 err = sqlite3_step(storeBlock);
01731 sqlite3_reset(storeBlock);
01732 if(err != SQLITE_DONE){
01733 stringstream error;
01734 error << "SQLiteBundleStorage: storeBlocks() failure: "<< err << " " << sqlite3_errmsg(database);
01735 std::cerr << error.str();
01736 throw SQLiteQuerryException(error.str());
01737 }
01738
01739
01740 blocknumber++;
01741 free(blockfilename);
01742 }
01743
01744 return storedBytes;
01745 }
01746
01747 int SQLiteBundleStorage::readBlocks(data::Bundle &bundle, const string &bundleID){
01748 int err = 0;
01749 string file;
01750 list<string> filenames;
01751
01752 fstream filestream;
01753
01754
01755 err = sqlite3_bind_text(getBlocksByID,1,bundleID.c_str(),bundleID.length(),SQLITE_TRANSIENT);
01756 if(err != SQLITE_OK)
01757 cerr << "bind error"<<endl;
01758 err = sqlite3_step(getBlocksByID);
01759 if(err == SQLITE_DONE){
01760 cerr << "readBlocks: no Blocks found for: " << bundleID <<endl;
01761 }
01762 while(err == SQLITE_ROW){
01763 file = (const char*)sqlite3_column_text(getBlocksByID,0);
01764 filenames.push_back(file);
01765 err = sqlite3_step(getBlocksByID);
01766 }
01767 if(err != SQLITE_DONE){
01768 stringstream error;
01769 error << "SQLiteBundleStorage: readBlocks() failure: "<< err << " " << sqlite3_errmsg(database);
01770 std::cerr << error.str();
01771 throw SQLiteQuerryException(error.str());
01772 }
01773 sqlite3_reset(getBlocksByID);
01774
01775
01776 for(std::list<string>::const_iterator it = filenames.begin(); it != filenames.end(); it++){
01777 std::ifstream is((*it).c_str(), ios::binary);
01778 dtn::data::SeparateDeserializer deserializer(is, bundle);
01779 deserializer.readBlock();
01780 is.close();
01781 }
01782 return 0;
01783 }
01784
01785 void SQLiteBundleStorage::idle_work(){
01786 int err;
01787 list<string>::iterator it;
01788 it = deleteList.begin();
01789 while(it != deleteList.end() || idle){
01790 ::remove((*it).c_str());
01791 }
01792
01793
01794
01795
01796
01797
01798
01799
01800
01801 if(idle){
01802 err = sqlite3_step(vacuum);
01803 sqlite3_reset(vacuum);
01804 }
01805 }
01806
01807 list<data::Block> SQLiteBundleStorage::getBlock(const data::BundleID &bundleID,const char &blocktype){
01808 int err;
01809 stringstream ss;
01810 ss << blocktype;
01811 list<data::Block> result;
01812 fstream filestream;
01813 string filename;
01814 err = sqlite3_bind_text(getBlocks,1, bundleID.toString().c_str(), bundleID.toString().length(), SQLITE_TRANSIENT);
01815 if(err != SQLITE_OK)
01816 cerr << "bind error"<<endl;
01817 err = sqlite3_bind_int(getBlocks,2,atoi(ss.str().c_str()));
01818 if(err != SQLITE_OK)
01819 cerr << "bind error"<<endl;
01820 err = sqlite3_step(getBlocks);
01821 while (err == SQLITE_ROW){
01822 filename = (const char*)sqlite3_column_text(getBlocks,0);
01823
01824 std::ifstream is(filename.c_str(),ios::binary);
01825 dtn::data::SeparateDeserializer deserializer(is, bundle);
01826 deserializer.readBlock();
01827 is.close();
01828
01829 err = sqlite3_step(getBlocks);
01830 }
01831 sqlite3_reset(getBlocks);
01832 if(err != SQLITE_DONE ){
01833 stringstream error;
01834 error << "SQLiteBundleStorage: getBlock() failure: "<< err << " " << sqlite3_errmsg(database);
01835 std::cerr << error.str();
01836 throw SQLiteQuerryException(error.str());
01837 }
01838 return result;
01839 }
01840
01841 const std::string SQLiteBundleStorage::getName() const
01842 {
01843 return "SQLiteBundleStorage";
01844 }
01845
01846
01847
01848
01849
01850
01851
01852
01853
01854
01855
01856 void SQLiteBundleStorage::releaseCustody(dtn::data::BundleID&)
01857 {
01858
01859
01860 }
01861 }
01862 }