00001
00002
00003
00004
00005
00006
00007
00008 #include "core/SQLiteBundleStorage.h"
00009 #include "core/TimeEvent.h"
00010 #include "core/GlobalEvent.h"
00011 #include "core/BundleCore.h"
00012
00013 #include <ibrdtn/data/Bundle.h>
00014 #include <ibrdtn/data/BundleID.h>
00015 #include <ibrdtn/data/Serializer.h>
00016 #include <ibrdtn/data/PayloadBlock.h>
00017
00018 #include <ibrcommon/thread/MutexLock.h>
00019 #include <ibrcommon/Logger.h>
00020 #include <ibrcommon/AutoDelete.h>
00021
00022 #include <iostream>
00023 #include <list>
00024 #include <stdio.h>
00025 #include <vector>
00026
00027 namespace dtn {
00028 namespace core {
00029
00030 SQLiteBundleStorage::SQLiteBundleStorage(ibrcommon::File dbPath ,string dbFile , size_t size)
00031 : _BundleTable("Bundles"), _FragmentTable("Fragments"), dbPath(dbPath), dbFile(dbFile), dbSize(size)
00032 {
00033 }
00034
00035 SQLiteBundleStorage::~SQLiteBundleStorage()
00036 {
00037 }
00038
00039 void SQLiteBundleStorage::componentUp()
00040 {
00041 int err,filename;
00042 list<int> inconsistentList;
00043 list<int>::iterator it;
00044
00045
00046 ibrcommon::File db = dbPath.get(dbFile);
00047
00048
00049 err = sqlite3_open_v2(db.getPath().c_str(),&database,SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL);
00050 if (err){
00051 std::cerr << "Can't open database: " << sqlite3_errmsg(database);
00052 sqlite3_close(database);
00053 throw "Unable to open SQLite Database";
00054 }
00055
00056
00057 sqlite3_stmt *createBundleTable = prepareStatement("create table if not exists "+_BundleTable+" (Filename INTEGER PRIMARY KEY ASC,BundleID text, Destination text, TTL int, Priority int);");
00058 err = sqlite3_step(createBundleTable);
00059 if(err != SQLITE_DONE){
00060 std::cerr << "SQLiteBundleStorage: Constructorfailure: createTable failed " << err;
00061 }
00062 sqlite3_finalize(createBundleTable);
00063
00064
00065 sqlite3_stmt *createFragmentTable = prepareStatement("create table if not exists "+_FragmentTable+" (Filename INTEGER PRIMARY KEY ASC, Source text, Timestamp int, Sequencenumber int, Destination text, TTL int, Priority int, FragementationOffset int, PayloadLength int, file int);");
00066 err = sqlite3_step(createFragmentTable);
00067 if(err != SQLITE_DONE){
00068 std::cerr << "SQLiteBundleStorage: Constructorfailure: createTable failed " << err;
00069 }
00070 sqlite3_finalize(createFragmentTable);
00071
00072
00073 sqlite3_stmt *conistencycheck = prepareStatement("SELECT ROWID FROM "+_BundleTable+";");
00074 stringstream name;
00075 for(int i = sqlite3_step(conistencycheck); i == SQLITE_ROW; i=sqlite3_step(conistencycheck)){
00076 filename = sqlite3_column_int(conistencycheck,0);
00077
00078 stringstream ss; ss << filename;
00079 ibrcommon::File datafile = dbPath.get(ss.str());
00080 FILE *fpointer = fopen(datafile.getPath().c_str(),"r");
00081 if (fpointer == NULL){
00082
00083 inconsistentList.push_front(filename);
00084 cerr << "datenbank inkonsistent" << endl;
00085 }
00086 else{
00087 fclose(fpointer);
00088 }
00089 }
00090 sqlite3_finalize(conistencycheck);
00091
00092
00093
00094
00095 sqlite3_stmt *deleteBundle = prepareStatement("DELETE FROM "+_BundleTable+" WHERE ROWID = ?;");
00096 for(it = inconsistentList.begin(); it != inconsistentList.end(); it++){
00097
00098 sqlite3_bind_int(deleteBundle,1,(*it));
00099 err = sqlite3_step(deleteBundle);
00100 if ( err != SQLITE_DONE )
00101 {
00102 std::cerr << "SQLiteBundlestorage: failure in prepareStatement: " << err << std::endl;
00103 }
00104 sqlite3_reset(deleteBundle);
00105 }
00106 sqlite3_finalize(deleteBundle);
00107
00108
00109 getTTL = prepareStatement("SELECT TTL FROM "+_BundleTable+" ASC;");
00110
00111 getBundleByDestination = prepareStatement("SELECT ROWID FROM "+_BundleTable+" WHERE Destination = ? ORDER BY TTL ASC;");
00112 getBundleByID = prepareStatement("SELECT ROWID FROM "+_BundleTable+" WHERE BundleID = ?;");
00113 getFragements = prepareStatement("SELECT * FROM "+_FragmentTable+" WHERE Source = ? AND Timestamp = ? ORDER BY FragementationOffset ASC;");
00114 store_Bundle = prepareStatement("INSERT INTO "+_BundleTable+" (BundleID, Destination, TTL, Priority) VALUES (?,?,?,?);");
00115 store_Fragment = prepareStatement("INSERT INTO "+_FragmentTable+" (Source, Timestamp, Sequencenumber, Destination, TTL, Priority, FragementationOffset ,PayloadLength,file) VALUES (?,?,?,?,?,?,?,?,?);");
00116 clearStorage = prepareStatement("DELETE FROM "+_BundleTable+";");
00117 countEntries = prepareStatement("SELECT Count(ROWID) From "+_BundleTable+";");
00118 vacuum = prepareStatement("vacuum;");
00119 getROWID = prepareStatement("select ROWID from "+_BundleTable+";");
00120 removeBundle = prepareStatement("Delete From "+_BundleTable+" Where BundleID = ?;");
00121 removeFragments = prepareStatement("DELETE FROM "+_FragmentTable+" WHERE Source = ? AND Timestamp = ?;");
00122
00123
00124 bindEvent(TimeEvent::className);
00125 bindEvent(GlobalEvent::className);
00126 }
00127
00128 void SQLiteBundleStorage::componentDown()
00129 {
00130 {
00131 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00132
00133
00134 sqlite3_finalize(getTTL);
00135 sqlite3_finalize(getBundleByDestination);
00136 sqlite3_finalize(getBundleByID);
00137 sqlite3_finalize(store_Bundle);
00138 sqlite3_finalize(clearStorage);
00139 sqlite3_finalize(countEntries);
00140 sqlite3_finalize(vacuum);
00141 sqlite3_finalize(getROWID);
00142 sqlite3_finalize(removeBundle);
00143
00144
00145 sqlite3_close(database);
00146 }
00147
00148
00149 unbindEvent(TimeEvent::className);
00150 unbindEvent(GlobalEvent::className);
00151
00152 _tasks.unblock();
00153 }
00154
00155 void SQLiteBundleStorage::releaseCustody(dtn::data::BundleID&)
00156 {
00157
00158
00159 }
00160
00161 sqlite3_stmt* SQLiteBundleStorage::prepareStatement(string sqlquery){
00162
00163 sqlite3_stmt *statement;
00164 int err = sqlite3_prepare_v2(database, sqlquery.c_str(), sqlquery.length(), &statement, 0);
00165 if ( err != SQLITE_OK )
00166 {
00167 std::cerr << "SQLiteBundlestorage: failure in prepareStatement: " << err << " with Query: " << sqlquery << std::endl;
00168 }
00169 return statement;
00170 }
00171
00172 dtn::data::Bundle SQLiteBundleStorage::get(const ibrcommon::BloomFilter &filter)
00173 {
00174 throw BundleStorage::NoBundleFoundException();
00175 }
00176
00177 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::BundleID &id){
00178 int err, filename;
00179 data::Bundle bundle;
00180 stringstream completefilename;
00181 fstream datei;
00182
00183
00184 string ID = id.toString();
00185
00186 {
00187 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00188 sqlite3_bind_text(getBundleByID, 1, ID.c_str(), ID.length(),SQLITE_TRANSIENT);
00189
00190
00191 err = sqlite3_step(getBundleByID);
00192
00193
00194 if(err != SQLITE_ROW){
00195 std::cerr << "SQLiteBundleStorage: No Bundle found with BundleID: " << id.toString() <<endl;
00196 throw BundleStorage::NoBundleFoundException();
00197 }
00198 filename = sqlite3_column_int(getBundleByID, 0);
00199
00200 err = sqlite3_step(getBundleByID);
00201 if (err != SQLITE_DONE){
00202 sqlite3_reset(getBundleByID);
00203 throw "SQLiteBundleStorage: Database contains two or more Bundle with the same BundleID, the requested Bundle is maybe a Fragment";
00204 }
00205
00206 completefilename << dbPath.getPath() << "/" << filename;
00207 datei.open((completefilename.str()).c_str(), ios::in|ios::binary);
00208 dtn::data::DefaultDeserializer(datei) >> bundle;
00209 datei.close();
00210
00211
00212 sqlite3_reset(getBundleByID);
00213 }
00214
00215 return bundle;
00216 }
00217
00218 dtn::data::Bundle SQLiteBundleStorage::get(const dtn::data::EID &eid){
00219 dtn::data::Bundle bundle;
00220
00221 int err,filename;
00222 stringstream completefilename;
00223 string ID = eid.getString();
00224
00225
00226
00227
00228 err = sqlite3_bind_text(getBundleByDestination, 1, ID.c_str(), ID.length(),SQLITE_TRANSIENT);
00229
00230
00231 err = sqlite3_step(getBundleByDestination);
00232
00233
00234 if(err != SQLITE_ROW){
00235 sqlite3_reset(getBundleByDestination);
00236 throw BundleStorage::NoBundleFoundException();
00237 }
00238 filename = sqlite3_column_int(getBundleByDestination, 0);
00239
00240 fstream datei;
00241 completefilename << dbPath.getPath() << "/" << filename;
00242 datei.open((completefilename.str()).c_str(), ios::in|ios::binary);
00243 dtn::data::DefaultDeserializer(datei) >> bundle;
00244 datei.close();
00245
00246
00247 sqlite3_reset(getBundleByID);
00248
00249 return bundle;
00250 }
00251
00252 void SQLiteBundleStorage::store(const dtn::data::Bundle &bundle){
00253
00254
00255
00256
00257
00258
00259 int err, TTL, priority;
00260 stringstream stream_bundleid, completefilename;
00261 string bundlestring, destination;
00262
00263
00264 priority = 2 * (bundle._procflags & dtn::data::Bundle::PRIORITY_BIT2) + (bundle._procflags & dtn::data::Bundle::PRIORITY_BIT1);
00265
00266 destination = bundle._destination.getString();
00267 stream_bundleid << "[" << bundle._timestamp << "." << bundle._sequencenumber << "] " << bundle._source.getString();
00268 TTL = bundle._timestamp + bundle._lifetime;
00269
00270 {
00271 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00272 sqlite3_bind_text(store_Bundle, 1, stream_bundleid.str().c_str(), stream_bundleid.str().length(),SQLITE_TRANSIENT);
00273 sqlite3_bind_text(store_Bundle, 2,destination.c_str(), destination.length(),SQLITE_TRANSIENT);
00274 sqlite3_bind_int(store_Bundle,3,TTL);
00275 sqlite3_bind_int(store_Bundle,4,priority);
00276 err = sqlite3_step(store_Bundle);
00277 if(err != SQLITE_DONE){
00278 std::cerr << "SQLiteBundleStorage: store() failure: "<< err << " " << sqlite3_errmsg(database) <<endl;
00279 }
00280 sqlite3_reset(store_Bundle);
00281
00282 int filename = sqlite3_last_insert_rowid(database);
00283 fstream datei;
00284 completefilename << dbPath.getPath() << "/" << filename;
00285 datei.open((completefilename.str()).c_str(), ios::out|ios::binary);
00286 dtn::data::DefaultSerializer(datei) << bundle;
00287 datei.close();
00288 }
00289
00290 dbMutex.signal(true);
00291 }
00292
00293 void SQLiteBundleStorage::remove(const dtn::data::BundleID &id){
00294 int err, filename;
00295 stringstream file;
00296
00297 {
00298 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00299 sqlite3_bind_text(getBundleByID, 1, id.toString().c_str(), id.toString().length(),SQLITE_TRANSIENT);
00300 err = sqlite3_step(getBundleByID);
00301 if(err != SQLITE_OK){
00302 std::cerr << "SQLiteBundleStorage: failure while getting filename: " << id.toString() << " errmsg: " << err <<endl;
00303 }
00304 filename = sqlite3_column_int(getBundleByID,0);
00305 file << dbPath.getPath() << "/" << filename;
00306 sqlite3_reset(removeBundle);
00307 err = ::remove(file.str().c_str());
00308 if(err != SQLITE_OK){
00309 std::cerr << "SQLiteBundleStorage: remove():Datei konnte nicht gelöscht werden " << " errmsg: " << err <<endl;
00310 }
00311
00312 sqlite3_bind_text(removeBundle, 1, id.toString().c_str(), id.toString().length(),SQLITE_TRANSIENT);
00313 err = sqlite3_step(removeBundle);
00314 if(err != SQLITE_OK){
00315 std::cerr << "SQLiteBundleStorage: failure while removing: " << id.toString() << " errmsg: " << err <<endl;
00316 }
00317 sqlite3_reset(removeBundle);
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328 err = sqlite3_step(vacuum);
00329 sqlite3_reset(vacuum);
00330 }
00331 }
00332
00333 void SQLiteBundleStorage::clear()
00334 {
00335 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00336 sqlite3_step(clearStorage);
00337 sqlite3_reset(clearStorage);
00338 if(SQLITE_OK != sqlite3_step(vacuum)){
00339 std::cerr << "SQLiteBundleStorage: failure while processing vacuum.";
00340 }
00341 sqlite3_reset(vacuum);
00342 }
00343
00344 bool SQLiteBundleStorage::empty(){
00345 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00346 if (SQLITE_DONE == sqlite3_step(getROWID)){
00347 sqlite3_reset(getROWID);
00348 return true;
00349 }
00350 else{
00351 sqlite3_reset(getROWID);
00352 return false;
00353 }
00354 }
00355
00356 unsigned int SQLiteBundleStorage::count(){
00357 ibrcommon::MutexLock lock = ibrcommon::MutexLock(dbMutex);
00358 int rows = 0,err;
00359 err = sqlite3_step(countEntries);
00360 rows = sqlite3_column_int(countEntries,0);
00361 sqlite3_reset(countEntries);
00362 return rows;
00363 }
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427
00428
00429
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539
00540 void SQLiteBundleStorage::raiseEvent(const Event *evt)
00541 {
00542 try {
00543 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt);
00544
00545 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00546 {
00547
00548
00549 }
00550 } catch (std::bad_cast) {
00551
00552 }
00553 }
00554
00555 void SQLiteBundleStorage::componentRun(void)
00556 {
00557 try {
00558 while (isRunning())
00559 {
00560 Task *t = _tasks.blockingpop();
00561 ibrcommon::AutoDelete<Task> ad(t);
00562
00563 try {
00564 deleteexpired();
00565
00566 } catch (...) {
00567
00568 }
00569 }
00570 } catch (ibrcommon::Exception) {
00571
00572 }
00573 }
00574
00575 void SQLiteBundleStorage::deleteexpired(){
00576
00577
00578
00579
00580
00581 }
00582 }
00583 }