IBR-DTNSuite 0.6

daemon/src/core/SimpleBundleStorage.cpp

Go to the documentation of this file.
00001 #include "core/SimpleBundleStorage.h"
00002 #include "core/TimeEvent.h"
00003 #include "core/GlobalEvent.h"
00004 #include "core/BundleExpiredEvent.h"
00005 #include "core/BundleEvent.h"
00006 
00007 #include <ibrdtn/data/AgeBlock.h>
00008 #include <ibrdtn/utils/Utils.h>
00009 #include <ibrcommon/thread/MutexLock.h>
00010 #include <ibrcommon/Logger.h>
00011 #include <ibrcommon/AutoDelete.h>
00012 
00013 #include <string.h>
00014 #include <stdlib.h>
00015 #include <iostream>
00016 #include <fstream>
00017 #include <cstring>
00018 #include <cerrno>
00019 
00020 namespace dtn
00021 {
00022         namespace core
00023         {
00024                 SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, size_t maxsize, size_t buffer_limit)
00025                  : _datastore(*this, workdir, buffer_limit), _maxsize(maxsize), _currentsize(0)
00026                 {
00027                         // load persistent bundles
00028                         _datastore.iterateAll();
00029 
00030                         // some output
00031                         IBRCOMMON_LOGGER(info) << _bundles.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL;
00032                 }
00033 
00034                 SimpleBundleStorage::~SimpleBundleStorage()
00035                 {
00036                 }
00037 
00038                 void SimpleBundleStorage::eventDataStorageStored(const dtn::core::DataStorage::Hash &hash)
00039                 {
00040                         ibrcommon::MutexLock l(_bundleslock);
00041                         dtn::data::MetaBundle meta(_pending_bundles[hash]);
00042                         _pending_bundles.erase(hash);
00043                         _stored_bundles[meta] = hash;
00044                 }
00045 
00046                 void SimpleBundleStorage::eventDataStorageStoreFailed(const dtn::core::DataStorage::Hash &hash, const ibrcommon::Exception &ex)
00047                 {
00048                         IBRCOMMON_LOGGER(error) << "store failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00049 
00050                         ibrcommon::MutexLock l(_bundleslock);
00051 
00052                         // get the reference to the bundle
00053                         const dtn::data::Bundle &b = _pending_bundles[hash];
00054 
00055                         // decrement the storage size
00056                         _currentsize -= _bundle_size[b];
00057 
00058                         // cleanup bundle sizes
00059                         _bundle_size.erase(b);
00060 
00061                         // delete the pending bundle
00062                         _pending_bundles.erase(hash);
00063                 }
00064 
00065                 void SimpleBundleStorage::eventDataStorageRemoved(const dtn::core::DataStorage::Hash &hash)
00066                 {
00067                         ibrcommon::MutexLock l(_bundleslock);
00068 
00069                         for (std::map<dtn::data::MetaBundle, DataStorage::Hash>::iterator iter = _stored_bundles.begin();
00070                                         iter != _stored_bundles.end(); iter++)
00071                         {
00072                                 if (iter->second == hash)
00073                                 {
00074                                         // decrement the storage size
00075                                         _currentsize -= _bundle_size[iter->first];
00076 
00077                                         _bundle_size.erase(iter->first);
00078 
00079                                         _stored_bundles.erase(iter);
00080                                         return;
00081                                 }
00082                         }
00083                 }
00084 
00085                 void SimpleBundleStorage::eventDataStorageRemoveFailed(const dtn::core::DataStorage::Hash&, const ibrcommon::Exception &ex)
00086                 {
00087                         IBRCOMMON_LOGGER(error) << "remove failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00088                 }
00089 
00090                 void SimpleBundleStorage::iterateDataStorage(const dtn::core::DataStorage::Hash &hash, dtn::core::DataStorage::istream &stream)
00091                 {
00092                         try {
00093                                 dtn::data::Bundle bundle;
00094                                 dtn::data::DefaultDeserializer ds(*stream);
00095 
00096                                 // load a bundle into the storage
00097                                 ds >> bundle;
00098 
00099                                 // extract meta data
00100                                 dtn::data::MetaBundle meta(bundle);
00101 
00102                                 // lock the bundle lists
00103                                 ibrcommon::MutexLock l(_bundleslock);
00104 
00105                                 // add the bundle to the stored bundles
00106                                 _stored_bundles[meta] = hash;
00107 
00108                                 // increment the storage size
00109                                 _bundle_size[meta] = (*stream).tellg();
00110                                 _currentsize += (*stream).tellg();
00111 
00112                                 // add it to the bundle list
00113                                 dtn::data::BundleList::add(meta);
00114                                 _priority_index.insert(meta);
00115 
00116                         } catch (const std::exception&) {
00117                                 // report this error to the console
00118                                 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << hash.value << IBRCOMMON_LOGGER_ENDL;
00119 
00120                                 // error while reading file
00121                                 _datastore.remove(hash);
00122                         }
00123                 }
00124 
00125                 dtn::data::Bundle SimpleBundleStorage::__get(const dtn::data::MetaBundle &meta)
00126                 {
00127                         DataStorage::Hash hash(meta.toString());
00128                         std::map<DataStorage::Hash, dtn::data::Bundle>::iterator it = _pending_bundles.find(hash);
00129 
00130                         if (_pending_bundles.end() != it)
00131                         {
00132                                 return it->second;
00133                         }
00134 
00135                         try {
00136                                 DataStorage::istream stream = _datastore.retrieve(hash);
00137 
00138                                 // load the bundle from the storage
00139                                 dtn::data::Bundle bundle;
00140 
00141                                 // load the bundle from file
00142                                 try {
00143                                         dtn::data::DefaultDeserializer(*stream) >> bundle;
00144                                 } catch (const std::exception &ex) {
00145                                         throw dtn::SerializationFailedException("bundle get failed: " + std::string(ex.what()));
00146                                 }
00147 
00148                                 try {
00149                                         dtn::data::AgeBlock &agebl = bundle.getBlock<dtn::data::AgeBlock>();
00150 
00151                                         // modify the AgeBlock with the age of the file
00152                                         time_t age = stream.lastaccess() - stream.lastmodify();
00153 
00154                                         agebl.addSeconds(age);
00155                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00156 
00157                                 return bundle;
00158                         } catch (const DataStorage::DataNotAvailableException&) {
00159                                 throw BundleStorage::NoBundleFoundException();
00160                         }
00161                 }
00162 
00163                 void SimpleBundleStorage::componentUp()
00164                 {
00165                         bindEvent(TimeEvent::className);
00166                         _datastore.start();
00167                 }
00168 
00169                 void SimpleBundleStorage::componentDown()
00170                 {
00171                         unbindEvent(TimeEvent::className);
00172                         _datastore.stop();
00173                         _datastore.join();
00174                 }
00175 
00176                 void SimpleBundleStorage::raiseEvent(const Event *evt)
00177                 {
00178                         try {
00179                                 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt);
00180                                 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00181                                 {
00182                                         ibrcommon::MutexLock l(_bundleslock);
00183                                         dtn::data::BundleList::expire(time.getTimestamp());
00184                                 }
00185                         } catch (const std::bad_cast&) { }
00186                 }
00187 
00188                 const std::string SimpleBundleStorage::getName() const
00189                 {
00190                         return "SimpleBundleStorage";
00191                 }
00192 
00193                 bool SimpleBundleStorage::empty()
00194                 {
00195                         ibrcommon::MutexLock l(_bundleslock);
00196                         return dtn::data::BundleList::empty();
00197                 }
00198 
00199                 void SimpleBundleStorage::releaseCustody(const dtn::data::EID&, const dtn::data::BundleID&)
00200                 {
00201                         // custody is successful transferred to another node.
00202                         // it is safe to delete this bundle now. (depending on the routing algorithm.)
00203                 }
00204 
00205                 unsigned int SimpleBundleStorage::count()
00206                 {
00207                         ibrcommon::MutexLock l(_bundleslock);
00208                         return dtn::data::BundleList::size();
00209                 }
00210 
00211                 const std::list<dtn::data::MetaBundle> SimpleBundleStorage::get(BundleFilterCallback &cb)
00212                 {
00213                         // result list
00214                         std::list<dtn::data::MetaBundle> result;
00215 
00216                         // we have to iterate through all bundles
00217                         ibrcommon::MutexLock l(_bundleslock);
00218 
00219                         for (std::set<dtn::data::MetaBundle>::const_iterator iter = _priority_index.begin(); (iter != _priority_index.end()) && (result.size() < cb.limit()); iter++)
00220                         {
00221                                 const dtn::data::MetaBundle &meta = (*iter);
00222 
00223                                 if ( cb.shouldAdd(meta) )
00224                                 {
00225                                         result.push_back(meta);
00226                                 }
00227                         }
00228 
00229                         return result;
00230                 }
00231 
00232                 dtn::data::Bundle SimpleBundleStorage::get(const dtn::data::BundleID &id)
00233                 {
00234                         try {
00235                                 ibrcommon::MutexLock l(_bundleslock);
00236 
00237                                 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00238                                 {
00239                                         const dtn::data::MetaBundle &meta = (*iter);
00240                                         if (id == meta)
00241                                         {
00242                                                 return __get(meta);
00243                                         }
00244                                 }
00245                         } catch (const dtn::SerializationFailedException &ex) {
00246                                 // bundle loading failed
00247                                 IBRCOMMON_LOGGER(error) << "Error while loading bundle data: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00248 
00249                                 // the bundle is broken, delete it
00250                                 remove(id);
00251 
00252                                 throw BundleStorage::BundleLoadException();
00253                         }
00254 
00255                         throw BundleStorage::NoBundleFoundException();
00256                 }
00257 
00258                 void SimpleBundleStorage::store(const dtn::data::Bundle &bundle)
00259                 {
00260                         // get the bundle size
00261                         dtn::data::DefaultSerializer s(std::cout);
00262                         size_t bundle_size = s.getLength(bundle);
00263 
00264                         // store the bundle
00265                         BundleContainer *bc = new BundleContainer(bundle);
00266                         DataStorage::Hash hash(*bc);
00267 
00268                         // check if this container is too big for us.
00269                         {
00270                                 ibrcommon::MutexLock l(_bundleslock);
00271                                 if ((_maxsize > 0) && (_currentsize + bundle_size > _maxsize))
00272                                 {
00273                                         throw StorageSizeExeededException();
00274                                 }
00275 
00276                                 // create meta data object
00277                                 dtn::data::MetaBundle meta(bundle);
00278 
00279                                 // add the bundle to the stored bundles
00280                                 _pending_bundles[hash] = bundle;
00281 
00282                                 // increment the storage size
00283                                 _bundle_size[meta] = bundle_size;
00284                                 _currentsize += bundle_size;
00285 
00286                                 // add it to the bundle list
00287                                 dtn::data::BundleList::add(meta);
00288                                 _priority_index.insert(meta);
00289                         }
00290 
00291                         // put the bundle into the data store
00292                         _datastore.store(hash, bc);
00293                 }
00294 
00295                 void SimpleBundleStorage::remove(const dtn::data::BundleID &id)
00296                 {
00297                         ibrcommon::MutexLock l(_bundleslock);
00298 
00299                         for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00300                         {
00301                                 if ((*iter) == id)
00302                                 {
00303                                         // remove item in the bundlelist
00304                                         dtn::data::MetaBundle meta = (*iter);
00305 
00306                                         // remove it from the bundle list
00307                                         dtn::data::BundleList::remove(meta);
00308                                         _priority_index.erase(meta);
00309 
00310                                         DataStorage::Hash hash(meta.toString());
00311 
00312                                         // create a background task for removing the bundle
00313                                         _datastore.remove(hash);
00314 
00315                                         return;
00316                                 }
00317                         }
00318 
00319                         throw BundleStorage::NoBundleFoundException();
00320                 }
00321 
00322                 dtn::data::MetaBundle SimpleBundleStorage::remove(const ibrcommon::BloomFilter &filter)
00323                 {
00324                         ibrcommon::MutexLock l(_bundleslock);
00325 
00326                         for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00327                         {
00328                                 // remove item in the bundlelist
00329                                 const dtn::data::MetaBundle meta = (*iter);
00330 
00331                                 if ( filter.contains(meta.toString()) )
00332                                 {
00333                                         // remove it from the bundle list
00334                                         dtn::data::BundleList::remove(meta);
00335                                         _priority_index.erase(meta);
00336 
00337                                         DataStorage::Hash hash(meta.toString());
00338 
00339                                         // create a background task for removing the bundle
00340                                         _datastore.remove(hash);
00341 
00342                                         return meta;
00343                                 }
00344                         }
00345 
00346                         throw BundleStorage::NoBundleFoundException();
00347                 }
00348 
00349                 size_t SimpleBundleStorage::size() const
00350                 {
00351                         return _currentsize;
00352                 }
00353 
00354                 void SimpleBundleStorage::clear()
00355                 {
00356                         ibrcommon::MutexLock l(_bundleslock);
00357 
00358                         // mark all bundles for deletion
00359                         for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00360                         {
00361                                 // remove item in the bundlelist
00362                                 const dtn::data::MetaBundle meta = (*iter);
00363 
00364                                 DataStorage::Hash hash(meta.toString());
00365 
00366                                 // create a background task for removing the bundle
00367                                 _datastore.remove(hash);
00368                         }
00369 
00370                         _bundles.clear();
00371                         _priority_index.clear();
00372                         dtn::data::BundleList::clear();
00373 
00374                         // set the storage size to zero
00375                         _currentsize = 0;
00376                 }
00377 
00378                 void SimpleBundleStorage::eventBundleExpired(const ExpiringBundle &b)
00379                 {
00380                         for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00381                         {
00382                                 if ((*iter) == b.bundle)
00383                                 {
00384                                         // remove the bundle
00385                                         const dtn::data::MetaBundle &meta = (*iter);
00386 
00387                                         DataStorage::Hash hash(meta.toString());
00388 
00389                                         // create a background task for removing the bundle
00390                                         _datastore.remove(hash);
00391 
00392                                         // remove the bundle off the index
00393                                         _priority_index.erase(meta);
00394 
00395                                         break;
00396                                 }
00397                         }
00398 
00399                         // raise bundle event
00400                         dtn::core::BundleEvent::raise( b.bundle, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED);
00401 
00402                         // raise an event
00403                         dtn::core::BundleExpiredEvent::raise( b.bundle );
00404                 }
00405 
00406                 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b)
00407                  : _bundle(b)
00408                 { }
00409 
00410                 SimpleBundleStorage::BundleContainer::~BundleContainer()
00411                 { }
00412 
00413                 std::string SimpleBundleStorage::BundleContainer::getKey() const
00414                 {
00415                         return dtn::data::BundleID(_bundle).toString();
00416                 }
00417 
00418                 std::ostream& SimpleBundleStorage::BundleContainer::serialize(std::ostream &stream)
00419                 {
00420                         // get an serializer for bundles
00421                         dtn::data::DefaultSerializer s(stream);
00422 
00423                         // length of the bundle
00424                         unsigned int size = s.getLength(_bundle);
00425 
00426                         // serialize the bundle
00427                         s << _bundle; stream.flush();
00428 
00429                         // check the streams health
00430                         if (!stream.good())
00431                         {
00432                                 std::stringstream ss; ss << "Output stream went bad [" << std::strerror(errno) << "]";
00433                                 throw dtn::SerializationFailedException(ss.str());
00434                         }
00435 
00436                         // get the write position
00437                         if (size > stream.tellp())
00438                         {
00439                                 std::stringstream ss; ss << "Not all data were written [" << stream.tellp() << " of " << size << " bytes]";
00440                                 throw dtn::SerializationFailedException(ss.str());
00441                         }
00442 
00443                         // return the stream, this allows stacking
00444                         return stream;
00445                 }
00446         }
00447 }