00001
00002
00003
00004
00005
00006
00007
00008 #include "core/SQLiteBundleStorage.h"
00009 #include "core/TimeEvent.h"
00010 #include "core/GlobalEvent.h"
00011 #include "ibrcommon/thread/MutexLock.h"
00012 #include "ibrdtn/data/Bundle.h"
00013 #include "ibrdtn/data/BundleID.h"
00014 #include <iostream>
00015 #include "core/BundleCore.h"
00016 #include <list>
00017 #include <stdio.h>
00018
00019 #include <vector>
00020
00021 namespace dtn {
00022 namespace core {
00023
00024 SQLiteBundleStorage::SQLiteBundleStorage(ibrcommon::File dbPath ,string dbFile , int size):
00025 dbPath(dbPath), dbFile(dbFile), dbSize(size), _BundleTable("Bundles"), global_shutdown(false), _FragmentTable("Fragments"){
00026 int err,filename , err2;
00027 list<int> inconsistentList;
00028 list<int>::iterator it;
00029
00030
00031 ibrcommon::File db = dbPath.get(dbFile);
00032
00033
00034 err = sqlite3_open_v2(db.getPath().c_str(),&database,SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL);
00035 if (err){
00036 std::cerr << "Can't open database: " << sqlite3_errmsg(database);
00037 sqlite3_close(database);
00038 throw "Unable to open SQLite Database";
00039 }
00040
00041
00042 sqlite3_stmt *createBundleTable = prepareStatement("create table if not exists "+_BundleTable+" (Filename INTEGER PRIMARY KEY ASC,BundleID text, Destination text, TTL int, Priority int);");
00043 err = sqlite3_step(createBundleTable);
00044 if(err != SQLITE_DONE){
00045 std::cerr << "SQLiteBundleStorage: Constructorfailure: createTable failed " << err;
00046 }
00047 sqlite3_finalize(createBundleTable);
00048
00049
00050 sqlite3_stmt *createFragmentTable = prepareStatement("create table if not exists "+_FragmentTable+" (Filename INTEGER PRIMARY KEY ASC, Source text, Timestamp int, Sequencenumber int, Destination text, TTL int, Priority int, FragementationOffset int, PayloadLength int, file int);");
00051 err = sqlite3_step(createFragmentTable);
00052 if(err != SQLITE_DONE){
00053 std::cerr << "SQLiteBundleStorage: Constructorfailure: createTable failed " << err;
00054 }
00055 sqlite3_finalize(createFragmentTable);
00056
00057
00058 sqlite3_stmt *conistencycheck = prepareStatement("SELECT ROWID FROM "+_BundleTable+";");
00059 stringstream name;
00060 for(int i = sqlite3_step(conistencycheck); i == SQLITE_ROW; i=sqlite3_step(conistencycheck)){
00061 filename = sqlite3_column_int(conistencycheck,0);
00062
00063 stringstream ss; ss << filename;
00064 ibrcommon::File datafile = dbPath.get(ss.str());
00065 FILE *fpointer = fopen(datafile.getPath().c_str(),"r");
00066 if (fpointer == NULL){
00067
00068 inconsistentList.push_front(filename);
00069 cerr << "datenbank inkonsistent" << endl;
00070 }
00071 else{
00072 fclose(fpointer);
00073 }
00074 }
00075 sqlite3_finalize(conistencycheck);
00076
00077
00078
00079
00080 sqlite3_stmt *deleteBundle = prepareStatement("DELETE FROM "+_BundleTable+" WHERE ROWID = ?;");
00081 for(it = inconsistentList.begin(); it != inconsistentList.end(); it++){
00082
00083 sqlite3_bind_int(deleteBundle,1,(*it));
00084 err = sqlite3_step(deleteBundle);
00085 if ( err != SQLITE_DONE )
00086 {
00087 std::cerr << "SQLiteBundlestorage: failure in prepareStatement: " << err << std::endl;
00088 }
00089 sqlite3_reset(deleteBundle);
00090 }
00091 sqlite3_finalize(deleteBundle);
00092
00093
00094 getTTL = prepareStatement("SELECT TTL FROM "+_BundleTable+" ASC;");
00095
00096 getBundleByDestination = prepareStatement("SELECT ROWID FROM "+_BundleTable+" WHERE Destination = ? ORDER BY TTL ASC;");
00097 getBundleByID = prepareStatement("SELECT ROWID FROM "+_BundleTable+" WHERE BundleID = ?;");
00098 getFragements = prepareStatement("SELECT * FROM "+_FragmentTable+" WHERE Source = ? AND Timestamp = ? ORDER BY FragementationOffset ASC;");
00099 store_Bundle = prepareStatement("INSERT INTO "+_BundleTable+" (BundleID, Destination, TTL, Priority) VALUES (?,?,?,?);");
00100 store_Fragment = prepareStatement("INSERT INTO "+_FragmentTable+" (Source, Timestamp, Sequencenumber, Destination, TTL, Priority, FragementationOffset ,PayloadLength,file) VALUES (?,?,?,?,?,?,?,?,?);");
00101 clearStorage = prepareStatement("DELETE FROM "+_BundleTable+";");
00102 countEntries = prepareStatement("SELECT Count(ROWID) From "+_BundleTable+";");
00103 vacuum = prepareStatement("vacuum;");
00104 getROWID = prepareStatement("select ROWID from "+_BundleTable+";");
00105 removeBundle = prepareStatement("Delete From "+_BundleTable+" Where BundleID = ?;");
00106 removeFragments = prepareStatement("DELETE FROM "+_FragmentTable+" WHERE Source = ? AND Timestamp = ?;");
00107
00108
00109 bindEvent(TimeEvent::className);
00110 bindEvent(GlobalEvent::className);
00111 }
00112
00113 SQLiteBundleStorage::~SQLiteBundleStorage()
00114 {
00115 {
00116 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00117
00118
00119 sqlite3_finalize(getTTL);
00120 sqlite3_finalize(getBundleByDestination);
00121 sqlite3_finalize(getBundleByID);
00122 sqlite3_finalize(store_Bundle);
00123 sqlite3_finalize(clearStorage);
00124 sqlite3_finalize(countEntries);
00125 sqlite3_finalize(vacuum);
00126 sqlite3_finalize(getROWID);
00127 sqlite3_finalize(removeBundle);
00128
00129
00130 sqlite3_close(database);
00131 }
00132
00133 {
00134 ibrcommon::MutexLock l(timeeventConditional);
00135 global_shutdown = true;
00136 timeeventConditional.signal(true);
00137 }
00138
00139
00140 unbindEvent(TimeEvent::className);
00141 unbindEvent(GlobalEvent::className);
00142
00143 join();
00144 }
00145
00146 sqlite3_stmt* SQLiteBundleStorage::prepareStatement(string sqlquery){
00147
00148 sqlite3_stmt *statement;
00149 int err = sqlite3_prepare_v2(database, sqlquery.c_str(), sqlquery.length(), &statement, 0);
00150 if ( err != SQLITE_OK )
00151 {
00152 std::cerr << "SQLiteBundlestorage: failure in prepareStatement: " << err << " with Query: " << sqlquery << std::endl;
00153 }
00154 return statement;
00155 }
00156
00157 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id){
00158 int err, filename;
00159 data::Bundle bundle;
00160 stringstream completefilename;
00161 fstream datei;
00162
00163
00164 string ID = id.toString();
00165
00166 {
00167 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00168 sqlite3_bind_text(getBundleByID, 1, ID.c_str(), ID.length(),SQLITE_TRANSIENT);
00169
00170
00171 err = sqlite3_step(getBundleByID);
00172
00173
00174 if(err != SQLITE_ROW){
00175 std::cerr << "SQLiteBundleStorage: No Bundle found with BundleID: " << id.toString() <<endl;
00176 }
00177 filename = sqlite3_column_int(getBundleByID, 0);
00178
00179 err = sqlite3_step(getBundleByID);
00180 if (err != SQLITE_DONE){
00181 sqlite3_reset(getBundleByID);
00182 throw "SQLiteBundleStorage: Database contains two or more Bundle with the same BundleID, the requested Bundle is maybe a Fragment";
00183 }
00184
00185 completefilename << dbPath.getPath() << "/" << filename;
00186 datei.open((completefilename.str()).c_str(), ios::in|ios::binary);
00187 datei >> bundle;
00188 datei.close();
00189
00190
00191 sqlite3_reset(getBundleByID);
00192 }
00193
00194 return bundle;
00195 }
00196
00197 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::EID &eid){
00198 dtn::data::Bundle bundel;
00199 set<dtn::data::EID>::iterator iter;
00200 while(true){
00201 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00202
00203 if(!global_shutdown){
00204
00205 iter = unblockEID.find(eid);
00206 if(iter == unblockEID.end()){
00207 if(getBundle(eid,bundel)){
00208
00209 return bundel;
00210 }
00211 else{
00212
00213 dbMutex.wait();
00214 }
00215 }
00216 else{
00217 unblockEID.erase(eid);
00218 break;
00219 }
00220 }
00221 else {
00222 break;
00223 }
00224 }
00225 throw dtn::exceptions::NoBundleFoundException();
00226 }
00227
00228 bool SQLiteBundleStorage::getBundle(const dtn::data::EID &eid, dtn::data::Bundle &bundle){
00229 int err,filename;
00230 stringstream completefilename;
00231 string ID = eid.getString();
00232
00233
00234
00235
00236 err = sqlite3_bind_text(getBundleByDestination, 1, ID.c_str(), ID.length(),SQLITE_TRANSIENT);
00237
00238
00239 err = sqlite3_step(getBundleByDestination);
00240
00241
00242 if(err != SQLITE_ROW){
00243 sqlite3_reset(getBundleByDestination);
00244 return false;
00245 }
00246 filename = sqlite3_column_int(getBundleByDestination, 0);
00247
00248 fstream datei;
00249 completefilename << dbPath.getPath() << "/" << filename;
00250 datei.open((completefilename.str()).c_str(), ios::in|ios::binary);
00251 datei >> bundle;
00252 datei.close();
00253
00254
00255 sqlite3_reset(getBundleByID);
00256
00257 return true;
00258 }
00259
00260 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle){
00261
00262
00263
00264
00265
00266
00267 int err, TTL, priority;
00268 stringstream stream_bundleid, completefilename;
00269 string bundlestring, destination;
00270
00271
00272 priority = 2 * (bundle._procflags & dtn::data::Bundle::PRIORITY_BIT2) + (bundle._procflags & dtn::data::Bundle::PRIORITY_BIT1);
00273
00274 destination = bundle._destination.getString();
00275 stream_bundleid << "[" << bundle._timestamp << "." << bundle._sequencenumber << "] " << bundle._source.getString();
00276 TTL = bundle._timestamp + bundle._lifetime;
00277
00278 {
00279 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00280 sqlite3_bind_text(store_Bundle, 1, stream_bundleid.str().c_str(), stream_bundleid.str().length(),SQLITE_TRANSIENT);
00281 sqlite3_bind_text(store_Bundle, 2,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
00282 sqlite3_bind_int(store_Bundle,3,TTL);
00283 sqlite3_bind_int(store_Bundle,4,priority);
00284 err = sqlite3_step(store_Bundle);
00285 if(err != SQLITE_DONE){
00286 std::cerr << "SQLiteBundleStorage: store() failure: "<< err << " " << sqlite3_errmsg(database) <<endl;
00287 }
00288 sqlite3_reset(store_Bundle);
00289
00290 int filename = sqlite3_last_insert_rowid(database);
00291 fstream datei;
00292 completefilename << dbPath.getPath() << "/" << filename;
00293 datei.open((completefilename.str()).c_str(), ios::out|ios::binary);
00294 datei << bundle;
00295 datei.close();
00296 }
00297
00298 dbMutex.signal(true);
00299 }
00300
00301 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id){
00302 int err, filename;
00303 stringstream file;
00304
00305 {
00306 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00307 sqlite3_bind_text(getBundleByID, 1, id.toString().c_str(), id.toString().length(),SQLITE_TRANSIENT);
00308 err = sqlite3_step(getBundleByID);
00309 if(err != SQLITE_OK){
00310 std::cerr << "SQLiteBundleStorage: failure while getting filename: " << id.toString() << " errmsg: " << err <<endl;
00311 }
00312 filename = sqlite3_column_int(getBundleByID,0);
00313 file << dbPath.getPath() << "/" << filename;
00314 sqlite3_reset(removeBundle);
00315 err = ::remove(file.str().c_str());
00316 if(err != SQLITE_OK){
00317 std::cerr << "SQLiteBundleStorage: remove():Datei konnte nicht gelöscht werden " << " errmsg: " << err <<endl;
00318 }
00319
00320 sqlite3_bind_text(removeBundle, 1, id.toString().c_str(), id.toString().length(),SQLITE_TRANSIENT);
00321 err = sqlite3_step(removeBundle);
00322 if(err != SQLITE_OK){
00323 std::cerr << "SQLiteBundleStorage: failure while removing: " << id.toString() << " errmsg: " << err <<endl;
00324 }
00325 sqlite3_reset(removeBundle);
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336 err = sqlite3_step(vacuum);
00337 sqlite3_reset(vacuum);
00338 }
00339 }
00340
00341 void SQLiteBundleStorage::unblock(const dtn::data::EID &eid){
00342 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00343 unblockEID.insert(eid);
00344 dbMutex.signal(true);
00345 }
00346
00347 void SQLiteBundleStorage::clear(){
00348 char *err;
00349 {
00350 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00351 sqlite3_step(clearStorage);
00352 sqlite3_reset(clearStorage);
00353 if(SQLITE_OK != sqlite3_step(vacuum)){
00354 std::cerr << "SQLiteBundleStorage: failure while processing vacuum.";
00355 }
00356 sqlite3_reset(vacuum);
00357 }
00358 }
00359
00360 bool SQLiteBundleStorage::empty(){
00361 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00362 if (SQLITE_DONE == sqlite3_step(getROWID)){
00363 sqlite3_reset(getROWID);
00364 return true;
00365 }
00366 else{
00367 sqlite3_reset(getROWID);
00368 return false;
00369 }
00370 }
00371
00372 unsigned int SQLiteBundleStorage::count(){
00373 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00374 int rows = 0,err;
00375 err = sqlite3_step(countEntries);
00376 rows = sqlite3_column_int(countEntries,0);
00377 sqlite3_reset(countEntries);
00378 return rows;
00379 }
00380
00381
00382
00383
00384
00385 void SQLiteBundleStorage::storeFragment(const dtn::data::Bundle &bundle){
00386 int payloadsize, TTL, priority, err, filename;
00387 stringstream completefilename;
00388 string destination, sourceEID;
00389 list<dtn::data::Block*> blocklist;
00390 list<dtn::data::Block*>::iterator it;
00391
00392 destination = bundle._destination.getString();
00393 sourceEID = bundle._source.getString();
00394 TTL = bundle._timestamp + bundle._lifetime;
00395
00396
00397 priority = 2 * (bundle._procflags & dtn::data::Bundle::PRIORITY_BIT2) + (bundle._procflags & dtn::data::Bundle::PRIORITY_BIT1);
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409 ibrcommon::BLOB::Reference payloadBlob = (*it)->getBLOB();
00410 payloadsize = payloadBlob.getSize();
00411
00412
00413 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00414
00415 sqlite3_bind_text(getFragements,1, sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
00416 sqlite3_bind_int(getFragements,2,bundle._timestamp);
00417 err = sqlite3_step(getFragements);
00418 if(err == SQLITE_ERROR){
00419 std::cerr << "SQLiteBundleStorage: storeFragment() failure: "<< err << " " << sqlite3_errmsg(database) << endl;
00420 }
00421 if(err != SQLITE_ROW){
00422
00423
00424
00425
00426
00427
00428 sqlite3_reset(getFragements);
00429
00430
00431 sqlite3_bind_text(store_Fragment, 1, sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
00432 sqlite3_bind_int(store_Fragment,2,bundle._timestamp);
00433 sqlite3_bind_int(store_Fragment,3,bundle._sequencenumber);
00434 sqlite3_bind_text(store_Fragment, 4,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
00435 sqlite3_bind_int(store_Fragment,5,TTL);
00436 sqlite3_bind_int(store_Fragment,6,priority);
00437 sqlite3_bind_int(store_Fragment,7,bundle._fragmentoffset);
00438 sqlite3_bind_int(store_Fragment,8,payloadsize);
00439
00440 err = sqlite3_step(store_Fragment);
00441 if(err != SQLITE_OK){
00442 std::cerr << "SQLiteBundleStorage: storeFragment() failure: "<< err << " " << sqlite3_errmsg(database) << endl;
00443 }
00444 sqlite3_reset(store_Fragment);
00445
00446
00447 int filename = sqlite3_last_insert_rowid(database);
00448
00449 if(bundle._fragmentoffset == 0){
00450 fstream datei;
00451 completefilename << dbPath.getPath() << "/Fragment/" << filename;
00452 datei.open((completefilename.str()).c_str(), ios::binary);
00453 datei << bundle;
00454 datei.close();
00455 }
00456 else{
00457
00458 }
00459 }
00460
00461 else{
00462
00463
00464
00465
00466
00467
00468
00469
00471
00473 int fragmentoffset , bitcounter = 0;
00474 while (err != SQLITE_DONE || err == SQLITE_ERROR){
00475 fragmentoffset = sqlite3_column_int(getFragements,7);
00476
00477 if(sqlite3_column_int(getFragements,9) == 1){
00478 filename = sqlite3_column_int(getFragements,0);
00479 }
00480
00481
00482 if(bitcounter >= fragmentoffset){
00483 bitcounter = fragmentoffset - bitcounter + sqlite3_column_int(getFragements,8);
00484 }
00485 err = sqlite3_step(getFragements);
00486 }
00487
00488 fragmentoffset = bundle._fragmentoffset;
00489 if(bitcounter >= fragmentoffset){
00490 bitcounter = fragmentoffset - bitcounter + payloadsize;
00491 }
00492 sqlite3_reset(getFragements);
00493
00494
00495 if(bundle._fragmentoffset == 0){
00496
00497 fstream datei;
00498 completefilename << dbPath.getPath() << "/Fragment/" << filename;
00499 datei.open((completefilename.str()).c_str(), ios::binary);
00500 datei << bundle;
00501 datei.close();
00502 }
00503
00504 else{
00505
00506 fstream datei, tmpdatei;
00507 completefilename << dbPath.getPath() << "/Fragment/" << filename;
00508 tmpdatei.open((completefilename.str()).c_str(), ios::binary);
00509 tmpdatei << bundle;
00510
00511 }
00512
00513
00514
00515 if(bitcounter == bundle._appdatalength){
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535 }
00536 else{
00537 sqlite3_bind_text(store_Fragment, 1, sourceEID.c_str(), sourceEID.length(),SQLITE_TRANSIENT);
00538 sqlite3_bind_int(store_Fragment,2,bundle._timestamp);
00539 sqlite3_bind_int(store_Fragment,3,bundle._sequencenumber);
00540 sqlite3_bind_text(store_Fragment, 4,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
00541 sqlite3_bind_int(store_Fragment,5,TTL);
00542 sqlite3_bind_int(store_Fragment,6,priority);
00543 sqlite3_bind_int(store_Fragment,7,bundle._fragmentoffset);
00544 sqlite3_bind_int(store_Fragment,8,payloadsize);
00545 sqlite3_bind_int(store_Fragment,9,0);
00546 err = sqlite3_step(store_Fragment);
00547 if(err != SQLITE_OK){
00548 std::cerr << "SQLiteBundleStorage: storeFragment() failure: "<< err << " " << sqlite3_errmsg(database) << endl;
00549 }
00550 sqlite3_reset(store_Fragment);
00551 }
00552 }
00553 }
00554
00555 void SQLiteBundleStorage::raiseEvent(const Event *evt){
00556 const TimeEvent *time = dynamic_cast<const TimeEvent*>(evt);
00557 const GlobalEvent *global = dynamic_cast<const GlobalEvent*>(evt);
00558
00559 if (global != NULL)
00560 {
00561 if (global->getAction() == dtn::core::GlobalEvent::GLOBAL_SHUTDOWN)
00562 {
00563 {
00564 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00565 global_shutdown = true;
00566 }
00567 timeeventConditional.signal();
00568 dbMutex.signal(true);
00569 }
00570 }
00571
00572 if (time != NULL)
00573 {
00574
00575 if (time->getAction() == dtn::core::TIME_SECOND_TICK)
00576 {
00577
00578 }
00579 }
00580 }
00581
00582 void SQLiteBundleStorage::run(void){
00583 ibrcommon::MutexLock l(timeeventConditional);
00584 while(!global_shutdown){
00585 deleteexpired();
00586 timeeventConditional.wait();
00587 }
00588 }
00589
00590 void SQLiteBundleStorage::deleteexpired(){
00591
00592
00593
00594
00595
00596 }
00597 }
00598 }