00001
00002
00003
00004
00005
00006
00007
00008
00009 #ifndef SQLITEBUNDLESTORAGE_H_
00010 #define SQLITEBUNDLESTORAGE_H_
00011
00012 #include "BundleStorage.h"
00013 #include "Component.h"
00014 #include "EventReceiver.h"
00015 #include "core/BundleStorage.h"
00016 #include "core/EventReceiver.h"
00017 #include <ibrdtn/data/MetaBundle.h>
00018
00019 #include <ibrcommon/thread/Thread.h>
00020 #include <ibrcommon/thread/Conditional.h>
00021 #include <ibrcommon/data/File.h>
00022 #include <ibrcommon/thread/Queue.h>
00023
00024 #include <sqlite3.h>
00025 #include <string>
00026 #include <list>
00027 #include <set>
00028
00029
00030
00031 class SQLiteBundleStorageTest;
00032
00033 namespace dtn
00034 {
00035 namespace core
00036 {
00037 class SQLiteBundleStorage: public BundleStorage, public EventReceiver, public dtn::daemon::IndependentComponent, public ibrcommon::MutexInterface
00038 {
00039 enum SQL_TABLES
00040 {
00041 SQL_TABLE_BUNDLE = 0,
00042 SQL_TABLE_BLOCK = 1,
00043 SQL_TABLE_ROUTING = 2,
00044 SQL_TABLE_BUNDLE_ROUTING_INFO = 3,
00045 SQL_TABLE_NODE_ROUTING_INFO = 4,
00046 SQL_TABLE_END = 5
00047 };
00048
00049
00050 enum STORAGE_STMT
00051 {
00052 BUNDLE_GET_FILTER,
00053 BUNDLE_GET_ID,
00054 FRAGMENT_GET_ID,
00055 BUNDLE_GET_FRAGMENT,
00056
00057 EXPIRE_BUNDLES,
00058 EXPIRE_BUNDLE_FILENAMES,
00059 EXPIRE_BUNDLE_DELETE,
00060 EXPIRE_NEXT_TIMESTAMP,
00061
00062 EMPTY_CHECK,
00063 COUNT_ENTRIES,
00064
00065 BUNDLE_DELETE,
00066 FRAGMENT_DELETE,
00067 BUNDLE_CLEAR,
00068 BUNDLE_STORE,
00069 BUNDLE_UPDATE_CUSTODIAN,
00070 FRAGMENT_UPDATE_CUSTODIAN,
00071
00072 PROCFLAGS_SET,
00073
00074 BLOCK_GET_ID,
00075 BLOCK_GET_ID_FRAGMENT,
00076 BLOCK_GET,
00077 BLOCK_GET_FRAGMENT,
00078 BLOCK_CLEAR,
00079 BLOCK_STORE,
00080
00081 #ifdef SQLITE_STORAGE_EXTENDED
00082 ROUTING_GET,
00083 ROUTING_REMOVE,
00084 ROUTING_CLEAR,
00085 ROUTING_STORE,
00086
00087 NODE_GET,
00088 NODE_REMOVE,
00089 NODE_CLEAR,
00090 NODE_STORE,
00091
00092 INFO_GET,
00093 INFO_REMOVE,
00094 INFO_STORE,
00095 #endif
00096
00097 VACUUM,
00098 SQL_QUERIES_END
00099 };
00100
00101 static const std::string _tables[SQL_TABLE_END];
00102
00103
00104 static const std::string _sql_queries[SQL_QUERIES_END];
00105
00106
00107 static const std::string _db_structure[10];
00108
00109 public:
00110 friend class ::SQLiteBundleStorageTest;
00111
00112 class SQLBundleQuery
00113 {
00114 public:
00115 SQLBundleQuery();
00116 virtual ~SQLBundleQuery() = 0;
00117
00122 virtual const std::string getWhere() const = 0;
00123
00130 virtual size_t bind(sqlite3_stmt*, size_t offset) const
00131 {
00132 return offset;
00133 }
00134
00138 sqlite3_stmt *_statement;
00139 };
00140
00141 class SQLiteQueryException : public ibrcommon::Exception
00142 {
00143 public:
00144 SQLiteQueryException(string what = "Unable to execute Querry.") throw(): Exception(what)
00145 {
00146 }
00147 };
00148
00155 SQLiteBundleStorage(const ibrcommon::File &path, const size_t &size);
00156
00160 virtual ~SQLiteBundleStorage();
00161
00165 void openDatabase(const ibrcommon::File &path);
00166
00171 void store(const dtn::data::Bundle &bundle);
00172
00179 dtn::data::Bundle get(const dtn::data::BundleID &id);
00180
00184 const std::list<dtn::data::MetaBundle> get(BundleFilterCallback &cb);
00185
00186 #ifdef SQLITE_STORAGE_EXTENDED
00187
00192 void storeBundleRoutingInfo(const data::BundleID &BundleID, const int &key, const string &routingInfo);
00193
00199 void storeNodeRoutingInfo(const data::EID &nodel, const int &key, const std::string &routingInfo);
00200
00206 void storeRoutingInfo(const int &key, const std::string &routingInfo);
00207
00212 std::string getBundleRoutingInfo(const data::BundleID &bundleID, const int &key);
00213
00219 std::string getNodeRoutingInfo(const data::EID &eid, const int &key);
00220
00226 std::string getRoutingInfo(const int &key);
00227
00232 void removeBundleRoutingInfo(const data::BundleID &bundleID, const int &key);
00233
00238 void removeNodeRoutingInfo(const data::EID &eid, const int &key);
00239
00244 void removeRoutingInfo(const int &key);
00245 #endif
00246
00252 void remove(const dtn::data::BundleID &id);
00253
00257 void clear();
00258
00262 void clearAll();
00263
00267 bool empty();
00268
00272 unsigned int count();
00273
00277 void releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id);
00278
00279 #ifdef SQLITE_STORAGE_EXTENDED
00280
00285 void setPriority(const int priority,const dtn::data::BundleID &id);
00286
00293 list<data::Block> getBlock(const data::BundleID &bundleID,const char &blocktype);
00294
00299 int occupiedSpace();
00300 #endif
00301
00306 void raiseEvent(const Event *evt);
00307
00312 dtn::data::MetaBundle remove(const ibrcommon::BloomFilter&) { return dtn::data::MetaBundle(); };
00313
00317 virtual void trylock() throw (ibrcommon::MutexException);
00318
00322 virtual void enter() throw (ibrcommon::MutexException);
00323
00327 virtual void leave() throw (ibrcommon::MutexException);
00328
00329 protected:
00330 virtual void componentRun();
00331 virtual void componentUp();
00332 virtual void componentDown();
00333 bool __cancellation();
00334
00335 private:
00336 enum Position
00337 {
00338 FIRST_FRAGMENT = 0,
00339 LAST_FRAGMENT = 1,
00340 BOTH_FRAGMENTS = 2
00341 };
00342
00343 class AutoResetLock
00344 {
00345 public:
00346 AutoResetLock(ibrcommon::Mutex &mutex, sqlite3_stmt *st);
00347 ~AutoResetLock();
00348
00349 private:
00350 ibrcommon::MutexLock _lock;
00351 sqlite3_stmt *_st;
00352 };
00353
00354 class Task
00355 {
00356 public:
00357 virtual ~Task() {};
00358 virtual void run(SQLiteBundleStorage &storage) = 0;
00359 };
00360
00361 class BlockingTask : public Task
00362 {
00363 public:
00364 BlockingTask() : _done(false), _abort(false) {};
00365 virtual ~BlockingTask() {};
00366 virtual void run(SQLiteBundleStorage &storage) = 0;
00367
00371 void wait()
00372 {
00373 ibrcommon::MutexLock l(_cond);
00374 while (!_done && !_abort) _cond.wait();
00375
00376 if (_abort) throw ibrcommon::Exception("Task aborted");
00377 }
00378
00379 void abort()
00380 {
00381 ibrcommon::MutexLock l(_cond);
00382 _abort = true;
00383 _cond.signal(true);
00384 }
00385
00386 void done()
00387 {
00388 ibrcommon::MutexLock l(_cond);
00389 _done = true;
00390 _cond.signal(true);
00391 }
00392
00393 private:
00394 ibrcommon::Conditional _cond;
00395 bool _done;
00396 bool _abort;
00397 };
00398
00399 class TaskRemove : public Task
00400 {
00401 public:
00402 TaskRemove(const dtn::data::BundleID &id)
00403 : _id(id) { };
00404
00405 virtual ~TaskRemove() {};
00406 virtual void run(SQLiteBundleStorage &storage);
00407
00408 private:
00409 const dtn::data::BundleID _id;
00410 };
00411
00412 class TaskIdle : public Task
00413 {
00414 public:
00415 TaskIdle() { };
00416
00417 virtual ~TaskIdle() {};
00418 virtual void run(SQLiteBundleStorage &storage);
00419
00420 static ibrcommon::Mutex _mutex;
00421 static bool _idle;
00422 };
00423
00424 class TaskExpire : public Task
00425 {
00426 public:
00427 TaskExpire(size_t timestamp)
00428 : _timestamp(timestamp) { };
00429
00430 virtual ~TaskExpire() {};
00431 virtual void run(SQLiteBundleStorage &storage);
00432
00433 private:
00434 size_t _timestamp;
00435 };
00436
00442 dtn::data::MetaBundle get_meta(const dtn::data::BundleID &id);
00443
00444 void get(sqlite3_stmt *st, dtn::data::MetaBundle &bundle, size_t offset = 0);
00445 void get(sqlite3_stmt *st, dtn::data::Bundle &bundle, size_t offset = 0);
00446
00447 #ifdef SQLITE_STORAGE_EXTENDED
00448
00452 void storeFragment(const dtn::data::Bundle &bundle);
00453 #endif
00454
00460 sqlite3_stmt* prepare(const std::string &sqlquery);
00461
00469 int prepareBundle(list<std::string> &filenames, dtn::data::Bundle &bundle);
00470
00476 int store_blocks(const data::Bundle &Bundle);
00477
00483 void get_blocks(dtn::data::Bundle &bundle, const dtn::data::BundleID &id);
00484
00488 void check_consistency();
00489
00490 void check_fragments(set<string> &payloadFiles);
00491
00492 void check_bundles(set<string> &blockFiles);
00493
00497 void update_expire_time();
00498
00502 void update_custodian(const dtn::data::BundleID &b, const dtn::data::EID &custodian);
00503
00508 void new_expire_time(size_t ttl);
00509 void reset_expire_time();
00510 size_t get_expire_time();
00511
00512 void set_bundleid(sqlite3_stmt *st, const dtn::data::BundleID &id, size_t offset = 0) const;
00513 void get_bundleid(sqlite3_stmt *st, dtn::data::BundleID &id, size_t offset = 0) const;
00514
00518 virtual const std::string getName() const;
00519
00520 ibrcommon::File dbPath;
00521 ibrcommon::File dbFile;
00522 ibrcommon::File _blockPath;
00523
00524 int dbSize;
00525
00526
00527 sqlite3 *_database;
00528
00529
00530 ibrcommon::Queue<Task*> _tasks;
00531
00532 ibrcommon::Mutex _next_expiration_lock;
00533 size_t _next_expiration;
00534
00535
00536 sqlite3_stmt* _statements[SQL_QUERIES_END];
00537
00538
00539 ibrcommon::Mutex _locks[SQL_QUERIES_END];
00540
00541 void add_deletion(const dtn::data::BundleID &id);
00542 void remove_deletion(const dtn::data::BundleID &id);
00543 bool contains_deletion(const dtn::data::BundleID &id);
00544
00545
00546 ibrcommon::Mutex _deletion_mutex;
00547 std::set<dtn::data::BundleID> _deletion_list;
00548 };
00549 }
00550 }
00551
00552 #endif