30 #include <ibrcommon/thread/RWLock.h>
31 #include <ibrcommon/Logger.h>
46 const std::string SimpleBundleStorage::TAG =
"SimpleBundleStorage";
49 :
BundleStorage(maxsize), _datastore(*this, workdir, buffer_limit), _metastore(this)
59 IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 30) <<
"element successfully stored: " << hash.
value << IBRCOMMON_LOGGER_ENDL;
61 ibrcommon::RWLock l(_pending_lock);
62 _pending_bundles.erase(hash);
67 IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) <<
"store of element " << hash.
value <<
" failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
72 ibrcommon::MutexLock l(_pending_lock);
80 ibrcommon::RWLock l(_pending_lock);
83 _pending_bundles.erase(hash);
86 ibrcommon::RWLock l(_meta_lock);
94 IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 30) <<
"element successfully removed: " << hash.
value << IBRCOMMON_LOGGER_ENDL;
96 ibrcommon::RWLock l(_meta_lock);
115 IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) <<
"remove of element " << hash.
value <<
" failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
151 ibrcommon::RWLock l(_meta_lock);
154 _metastore.
store(meta, bundle_size);
159 IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 10) <<
"bundle restored " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
160 }
catch (
const std::exception&) {
162 IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) <<
"Unable to restore bundle from file " << hash.
value << IBRCOMMON_LOGGER_ENDL;
178 ibrcommon::MutexLock l(_meta_lock);
179 IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG,
info) << _metastore.
size() <<
" Bundles restored." << IBRCOMMON_LOGGER_ENDL;
186 }
catch (
const ibrcommon::ThreadException &ex) {
187 IBRCOMMON_LOGGER_TAG(
"SimpleBundleStorage", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
205 ibrcommon::RWLock l(_meta_lock);
208 }
catch (
const ibrcommon::Exception &ex) {
209 IBRCOMMON_LOGGER_TAG(
"SimpleBundleStorage", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
217 ibrcommon::RWLock l(_meta_lock);
218 _metastore.expire(time.getTimestamp());
224 return "SimpleBundleStorage";
229 ibrcommon::MutexLock l(_meta_lock);
230 return _metastore.
empty();
241 ibrcommon::MutexLock l(_meta_lock);
242 return _metastore.
size();
258 size_t items_added = 0;
261 ibrcommon::MutexLock l(_meta_lock);
263 for (
MetaStorage::const_iterator iter = _metastore.begin(); (iter != _metastore.end()) && ((cb.limit() == 0) || (items_added < cb.limit())); ++iter)
270 if ( cb.addIfSelected(result, meta) ) items_added++;
279 ibrcommon::MutexLock l(_meta_lock);
294 ibrcommon::MutexLock l(_pending_lock);
296 pending_map::iterator it = _pending_bundles.find(hash);
298 if (_pending_bundles.end() != it)
313 }
catch (
const std::exception &ex) {
321 time_t age = stream.lastaccess() - stream.lastmodify();
332 IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) <<
"failed to load bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
345 ibrcommon::MutexLock l(_meta_lock);
373 __store(ca_bundle, bundle_size);
374 }
catch (
const ibrcommon::Exception&) {
376 __store(bundle, bundle_size);
382 ibrcommon::MutexLock l(_meta_lock);
390 ibrcommon::MutexLock l(_meta_lock);
398 ibrcommon::MutexLock l(_meta_lock);
424 std::auto_ptr<BundleContainer> bc(
new BundleContainer(bundle));
429 ibrcommon::RWLock l(_pending_lock);
432 _pending_bundles[hash] = bundle;
437 ibrcommon::RWLock l(_meta_lock);
440 _metastore.
store(meta, bundle_size);
444 _datastore.
store(hash, bc.get());
453 ibrcommon::RWLock l(_meta_lock);
473 _datastore.remove(hash);
482 eventBundleRemoved(b);
485 SimpleBundleStorage::BundleContainer::BundleContainer(
const dtn::data::Bundle &b)
489 SimpleBundleStorage::BundleContainer::~BundleContainer()
492 std::string SimpleBundleStorage::BundleContainer::getId()
const
494 return createId(_bundle);
499 std::stringstream ss_hash, ss_raw;
502 int c = 0xff & ss_raw.get();
503 while (ss_raw.good())
505 ss_hash << std::hex << std::setw( 2 ) << std::setfill(
'0' ) << c;
506 c = 0xff & ss_raw.get();
509 return ss_hash.str();
512 std::ostream& SimpleBundleStorage::BundleContainer::serialize(std::ostream &stream)
521 s << _bundle; stream.flush();
526 std::stringstream ss; ss <<
"Output stream went bad [" << std::strerror(errno) <<
"]";
531 if (static_cast<std::streamoff>(size) > stream.tellp())
533 std::stringstream ss; ss <<
"Not all data were written [" << stream.tellp() <<
" of " << size <<
" bytes]";
virtual void componentUp()
virtual void eventDataStorageStoreFailed(const dtn::storage::DataStorage::Hash &hash, const ibrcommon::Exception &)
void remove(const dtn::data::BundleID &id)
virtual dtn::data::MetaBundle info(const dtn::data::BundleID &id)
virtual void eventDataStorageStored(const dtn::storage::DataStorage::Hash &hash)
static void add(EventReceiver< E > *receiver)
std::set< dtn::data::EID > eid_set
virtual Length getLength(const dtn::data::Bundle &obj)
void addSeconds(const dtn::data::Number &value)
void setFaulty(bool mode)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)
const dtn::data::EID acceptCustody(const dtn::data::MetaBundle &meta)
static void remove(const EventReceiver< E > *receiver)
void releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id)
virtual void eventDataStorageRemoved(const dtn::storage::DataStorage::Hash &hash)
void raiseEvent(const dtn::core::TimeEvent &evt)
void freeSpace(const dtn::data::Length &size)
virtual const eid_set getDistinctDestinations()
virtual bool contains(const dtn::data::BundleID &id)
static void raise(const dtn::data::Bundle &bundle)
void allocSpace(const dtn::data::Length &size)
virtual void eventBundleExpired(const dtn::data::MetaBundle &b)
void eventBundleAdded(const dtn::data::MetaBundle &b)
SimpleBundleStorage(const ibrcommon::File &workdir, const dtn::data::Length maxsize=0, const unsigned int buffer_limit=0)
DataStorage::istream retrieve(const Hash &hash)
virtual const std::string getName() const
virtual void eventDataStorageRemoveFailed(const dtn::storage::DataStorage::Hash &hash, const ibrcommon::Exception &)
void eventBundleRemoved(const dtn::data::BundleID &id)
virtual void setFaulty(bool mode)
const Hash store(Container *data)
iterator find(block_t blocktype)
static void raise(const dtn::data::MetaBundle &bundle, EventBundleAction action, dtn::data::StatusReportBlock::REASON_CODE reason=dtn::data::StatusReportBlock::NO_ADDITIONAL_INFORMATION)
virtual void componentDown()
virtual void iterateDataStorage(const dtn::storage::DataStorage::Hash &hash, dtn::storage::DataStorage::istream &stream)
void remove(const Hash &hash)
virtual ~SimpleBundleStorage()
static bool isExpired(const dtn::data::Timestamp ×tamp, const dtn::data::Number &lifetime)
virtual void store(const dtn::data::Bundle &bundle)