36 #include <ibrcommon/thread/MutexLock.h>
37 #include <ibrcommon/thread/RWLock.h>
38 #include <ibrcommon/data/BLOB.h>
39 #include <ibrcommon/Logger.h>
47 const std::string SQLiteBundleStorage::TAG =
"SQLiteBundleStorage";
49 ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex;
50 bool SQLiteBundleStorage::TaskIdle::_idle =
false;
52 SQLiteBundleStorage::SQLiteBLOB::SQLiteBLOB(
const ibrcommon::File &path)
57 SQLiteBundleStorage::SQLiteBLOB::~SQLiteBLOB()
63 void SQLiteBundleStorage::SQLiteBLOB::clear()
69 _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::trunc | ios::binary );
71 if (!_filestream.is_open())
73 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) <<
"can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
74 throw ibrcommon::CanNotOpenFileException(_file);
78 void SQLiteBundleStorage::SQLiteBLOB::open()
80 ibrcommon::BLOB::_filelimit.wait();
83 _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::binary );
85 if (!_filestream.is_open())
87 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) <<
"can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
88 throw ibrcommon::CanNotOpenFileException(_file);
92 void SQLiteBundleStorage::SQLiteBLOB::close()
100 ibrcommon::BLOB::_filelimit.post();
103 std::streamsize SQLiteBundleStorage::SQLiteBLOB::__get_size()
110 return ibrcommon::BLOB::Reference(
new SQLiteBLOB(_blobPath));
114 :
BundleStorage(maxsize), _database(path.get(
"sqlite.db"), *this)
117 if (usePersistentBundleSets)
121 ibrcommon::BLOB::changeProvider(
this,
false);
124 _blockPath = path.get(
"blocks");
125 _blobPath = path.get(
"blob");
128 ibrcommon::RWLock l(_global_lock);
131 _blobPath.remove(
true);
134 ibrcommon::File::createDirectory( _blobPath );
137 ibrcommon::File::createDirectory( _blockPath );
141 }
catch (
const ibrcommon::Exception &ex) {
142 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
152 ibrcommon::RWLock l(_global_lock);
156 }
catch (
const ibrcommon::Exception &ex) {
157 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
167 Task *t = _tasks.poll();
170 BlockingTask &btask =
dynamic_cast<BlockingTask&
>(*t);
173 }
catch (
const std::exception&) {
179 }
catch (
const std::bad_cast&) { };
182 std::auto_ptr<Task> killer(t);
184 }
catch (
const std::exception&) { };
186 }
catch (
const ibrcommon::QueueUnblockedException &ex) {
203 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
227 ibrcommon::MutexLock l(_global_lock);
230 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
237 ibrcommon::MutexLock l(_global_lock);
238 _database.get(cb, result);
247 ibrcommon::MutexLock l(_global_lock);
250 _database.
get(
id, bundle, blocks);
252 for (SQLiteDatabase::blocklist::const_iterator iter = blocks.begin(); iter != blocks.end(); ++iter)
255 const int blocktyp = entry.first;
256 const ibrcommon::File &file = entry.second;
258 IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 50) <<
"add block: " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
261 std::ifstream is(file.getPath().c_str(), std::ios::binary | std::ios::in);
266 SQLiteBLOB *blob =
new SQLiteBLOB(_blobPath);
269 ibrcommon::BLOB::Reference ref(blob);
273 blob->_file.remove();
276 if ( ::link(file.getPath().c_str(), blob->_file.getPath().c_str()) != 0 )
278 IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) <<
"hard-link failed (" << errno <<
") " << blob->_file.getPath() <<
" -> " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
281 std::ofstream fout(blob->_file.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
284 ibrcommon::BLOB::iostream stream = ref.iostream();
286 const std::streamsize length = stream.size();
287 ibrcommon::BLOB::copy(fout, (*stream), length);
297 }
catch (
const ibrcommon::Exception &ex) {
299 blob->_file.remove();
301 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) <<
"unable to load bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
319 time_t age = file.lastaccess() - file.lastmodify();
322 }
catch (
const std::bad_cast&) { };
329 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
342 IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) <<
"store bundle " << bundle.
toString() << IBRCOMMON_LOGGER_ENDL;
344 ibrcommon::RWLock l(_global_lock);
358 _database.
store(bundle, size);
377 ibrcommon::TemporaryFile tmpfile(_blockPath,
"payload");
381 ibrcommon::BLOB::Reference ref = payload.
getBLOB();
382 ibrcommon::BLOB::iostream stream = ref.iostream();
385 const SQLiteBLOB &blob =
dynamic_cast<const SQLiteBLOB&
>(*ref);
391 if ( ::link(blob._file.getPath().c_str(), tmpfile.getPath().c_str()) != 0 )
393 IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) <<
"hard-link failed (" << errno <<
") " << tmpfile.getPath() <<
" -> " << blob._file.getPath() << IBRCOMMON_LOGGER_ENDL;
396 std::ofstream fout(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
398 const std::streamsize length = stream.size();
399 ibrcommon::BLOB::copy(fout, (*stream), length);
401 }
catch (
const std::bad_cast&) {
403 std::ofstream fout(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
405 const std::streamsize length = stream.size();
406 ibrcommon::BLOB::copy(fout, (*stream), length);
408 }
catch (
const std::bad_cast&) {
411 throw ibrcommon::Exception(
"not a payload block");
415 storedBytes += tmpfile.size();
418 _database.
store(
id, index, block, tmpfile);
422 ibrcommon::TemporaryFile tmpfile(_blockPath,
"block");
424 std::ofstream filestream(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
430 storedBytes += tmpfile.size();
433 _database.
store(
id, index, block, tmpfile);
448 }
catch (
const ibrcommon::Exception&) {
452 IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 10) <<
"bundle " << bundle.
toString() <<
" stored" << IBRCOMMON_LOGGER_ENDL;
456 }
catch (
const ibrcommon::Exception &ex) {
457 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
468 ibrcommon::MutexLock l(_global_lock);
478 ibrcommon::MutexLock l(_global_lock);
480 _database.
get(
id, ret);
491 ibrcommon::RWLock l(_global_lock);
496 }
catch (
const ibrcommon::Exception &ex) {
497 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
503 ibrcommon::RWLock l(_global_lock);
507 }
catch (
const ibrcommon::Exception &ex) {
508 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
512 _blockPath.remove(
true);
513 ibrcommon::File::createDirectory(_blockPath);
522 ibrcommon::MutexLock l(_global_lock);
523 return _database.
empty();
524 }
catch (
const ibrcommon::Exception &ex) {
525 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
533 ibrcommon::MutexLock l(_global_lock);
534 return _database.
count();
535 }
catch (
const ibrcommon::Exception &ex) {
536 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
545 _tasks.push(
new TaskExpire(time.getTimestamp()));
554 ibrcommon::MutexLock l(TaskIdle::_mutex);
555 TaskIdle::_idle =
true;
558 _tasks.push(
new TaskIdle());
563 ibrcommon::MutexLock l(TaskIdle::_mutex);
564 TaskIdle::_idle =
false;
571 ibrcommon::RWLock l(storage._global_lock);
572 storage._database.
expire(_timestamp);
573 }
catch (
const ibrcommon::Exception &ex) {
574 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
578 void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage)
593 ibrcommon::RWLock l(storage._global_lock);
594 storage._database.vacuum();
595 }
catch (
const ibrcommon::Exception &ex) {
596 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
600 ibrcommon::Thread::sleep(1000);
602 ibrcommon::MutexLock l(TaskIdle::_mutex);
603 if (!TaskIdle::_idle)
return;
607 const std::string SQLiteBundleStorage::getName()
const
609 return "SQLiteBundleStorage";
615 ibrcommon::MutexLock l(_global_lock);
621 }
catch (
const ibrcommon::Exception &ex) {
622 IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
638 eventBundleRemoved(
id);
646 _tasks.wait(ibrcommon::Queue<Task*>::QUEUE_EMPTY);
std::string toString() const
void releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id)
static void add(EventReceiver< E > *receiver)
std::set< dtn::data::EID > eid_set
dtn::data::Length size() const
void raiseEvent(const dtn::core::TimeEvent &evt)
dtn::data::Bundle get(const dtn::data::BundleID &id)
virtual Length getLength(const dtn::data::Bundle &obj)
void addSeconds(const dtn::data::Number &value)
bool contains(const dtn::data::BundleID &id)
void setFaulty(bool mode)
virtual const eid_set getDistinctDestinations()
const dtn::data::EID acceptCustody(const dtn::data::MetaBundle &meta)
static void remove(const EventReceiver< E > *receiver)
SQLiteBundleStorage(const ibrcommon::File &path, const dtn::data::Length &maxsize, bool usePersistentBundleSets=false)
dtn::data::Block & readBlock()
void freeSpace(const dtn::data::Length &size)
void remove(const dtn::data::BundleID &id)
void update(UPDATE_VALUES, const dtn::data::BundleID &id, const dtn::data::EID &)
void allocSpace(const dtn::data::Length &size)
virtual void componentDown()
void store(const dtn::data::Bundle &bundle, const dtn::data::Length &size)
void eventBundleAdded(const dtn::data::MetaBundle &b)
block_list::const_iterator const_iterator
virtual ~SQLiteBundleStorage()
dtn::data::Size count() const
dtn::data::Length remove(const dtn::data::BundleID &id)
std::pair< int, const ibrcommon::File > blocklist_entry
static void setFactory(dtn::data::BundleSet::Factory *)
void expire(const dtn::data::Timestamp ×tamp)
virtual void componentUp()
virtual const eid_set getDistinctDestinations()
void iterateDatabase(const dtn::data::MetaBundle &bundle, const dtn::data::Length size)
virtual void setFaulty(bool mode)
void eventBundleExpired(const dtn::data::BundleID &id, const dtn::data::Length size)
static const dtn::data::block_t BLOCK_TYPE
void eventBundleRemoved(const dtn::data::BundleID &id)
bool contains(const dtn::data::BundleID &id)
virtual dtn::data::MetaBundle info(const dtn::data::BundleID &id)
ibrcommon::BLOB::Reference getBLOB() const
virtual void get(const BundleSelector &cb, BundleResult &result)
const block_t & getType() const
ibrcommon::BLOB::Reference create()
void store(const dtn::data::Bundle &bundle)
std::list< std::pair< int, const ibrcommon::File > > blocklist
virtual void componentRun()