Go to the documentation of this file.00001 #include "core/SimpleBundleStorage.h"
00002 #include "core/TimeEvent.h"
00003 #include "core/GlobalEvent.h"
00004 #include "core/BundleExpiredEvent.h"
00005 #include "core/BundleEvent.h"
00006
00007 #include <ibrdtn/data/AgeBlock.h>
00008 #include <ibrdtn/utils/Utils.h>
00009 #include <ibrcommon/thread/MutexLock.h>
00010 #include <ibrcommon/Logger.h>
00011 #include <ibrcommon/AutoDelete.h>
00012
00013 #include <string.h>
00014 #include <stdlib.h>
00015 #include <iostream>
00016 #include <fstream>
00017 #include <cstring>
00018 #include <cerrno>
00019
00020 namespace dtn
00021 {
00022 namespace core
00023 {
00024 SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, size_t maxsize, size_t buffer_limit)
00025 : _datastore(*this, workdir, buffer_limit), _maxsize(maxsize), _currentsize(0)
00026 {
00027
00028 _datastore.iterateAll();
00029
00030
00031 IBRCOMMON_LOGGER(info) << _bundles.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL;
00032 }
00033
00034 SimpleBundleStorage::~SimpleBundleStorage()
00035 {
00036 }
00037
00038 void SimpleBundleStorage::eventDataStorageStored(const dtn::core::DataStorage::Hash &hash)
00039 {
00040 ibrcommon::MutexLock l(_bundleslock);
00041 dtn::data::MetaBundle meta(_pending_bundles[hash]);
00042 _pending_bundles.erase(hash);
00043 _stored_bundles[meta] = hash;
00044 }
00045
00046 void SimpleBundleStorage::eventDataStorageStoreFailed(const dtn::core::DataStorage::Hash &hash, const ibrcommon::Exception &ex)
00047 {
00048 IBRCOMMON_LOGGER(error) << "store failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00049
00050 ibrcommon::MutexLock l(_bundleslock);
00051
00052
00053 const dtn::data::Bundle &b = _pending_bundles[hash];
00054
00055
00056 _currentsize -= _bundle_size[b];
00057
00058
00059 _bundle_size.erase(b);
00060
00061
00062 _pending_bundles.erase(hash);
00063 }
00064
00065 void SimpleBundleStorage::eventDataStorageRemoved(const dtn::core::DataStorage::Hash &hash)
00066 {
00067 ibrcommon::MutexLock l(_bundleslock);
00068
00069 for (std::map<dtn::data::MetaBundle, DataStorage::Hash>::iterator iter = _stored_bundles.begin();
00070 iter != _stored_bundles.end(); iter++)
00071 {
00072 if (iter->second == hash)
00073 {
00074
00075 _currentsize -= _bundle_size[iter->first];
00076
00077 _bundle_size.erase(iter->first);
00078
00079 _stored_bundles.erase(iter);
00080 return;
00081 }
00082 }
00083 }
00084
00085 void SimpleBundleStorage::eventDataStorageRemoveFailed(const dtn::core::DataStorage::Hash&, const ibrcommon::Exception &ex)
00086 {
00087 IBRCOMMON_LOGGER(error) << "remove failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00088 }
00089
00090 void SimpleBundleStorage::iterateDataStorage(const dtn::core::DataStorage::Hash &hash, dtn::core::DataStorage::istream &stream)
00091 {
00092 try {
00093 dtn::data::Bundle bundle;
00094 dtn::data::DefaultDeserializer ds(*stream);
00095
00096
00097 ds >> bundle;
00098
00099
00100 dtn::data::MetaBundle meta(bundle);
00101
00102
00103 ibrcommon::MutexLock l(_bundleslock);
00104
00105
00106 _stored_bundles[meta] = hash;
00107
00108
00109 _bundle_size[meta] = (*stream).tellg();
00110 _currentsize += (*stream).tellg();
00111
00112
00113 dtn::data::BundleList::add(meta);
00114 _priority_index.insert(meta);
00115
00116 } catch (const std::exception&) {
00117
00118 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << hash.value << IBRCOMMON_LOGGER_ENDL;
00119
00120
00121 _datastore.remove(hash);
00122 }
00123 }
00124
00125 dtn::data::Bundle SimpleBundleStorage::__get(const dtn::data::MetaBundle &meta)
00126 {
00127 DataStorage::Hash hash(meta.toString());
00128 std::map<DataStorage::Hash, dtn::data::Bundle>::iterator it = _pending_bundles.find(hash);
00129
00130 if (_pending_bundles.end() != it)
00131 {
00132 return it->second;
00133 }
00134
00135 try {
00136 DataStorage::istream stream = _datastore.retrieve(hash);
00137
00138
00139 dtn::data::Bundle bundle;
00140
00141
00142 try {
00143 dtn::data::DefaultDeserializer(*stream) >> bundle;
00144 } catch (const std::exception &ex) {
00145 throw dtn::SerializationFailedException("bundle get failed: " + std::string(ex.what()));
00146 }
00147
00148 try {
00149 dtn::data::AgeBlock &agebl = bundle.getBlock<dtn::data::AgeBlock>();
00150
00151
00152 time_t age = stream.lastaccess() - stream.lastmodify();
00153
00154 agebl.addAge(age);
00155 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { };
00156
00157 return bundle;
00158 } catch (const DataStorage::DataNotAvailableException&) {
00159 throw BundleStorage::NoBundleFoundException();
00160 }
00161 }
00162
00163 void SimpleBundleStorage::componentUp()
00164 {
00165 bindEvent(TimeEvent::className);
00166 _datastore.start();
00167 }
00168
00169 void SimpleBundleStorage::componentDown()
00170 {
00171 unbindEvent(TimeEvent::className);
00172 _datastore.stop();
00173 _datastore.join();
00174 }
00175
00176 void SimpleBundleStorage::raiseEvent(const Event *evt)
00177 {
00178 try {
00179 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt);
00180 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00181 {
00182 ibrcommon::MutexLock l(_bundleslock);
00183 dtn::data::BundleList::expire(time.getTimestamp());
00184 }
00185 } catch (const std::bad_cast&) { }
00186 }
00187
00188 const std::string SimpleBundleStorage::getName() const
00189 {
00190 return "SimpleBundleStorage";
00191 }
00192
00193 bool SimpleBundleStorage::empty()
00194 {
00195 ibrcommon::MutexLock l(_bundleslock);
00196 return dtn::data::BundleList::empty();
00197 }
00198
00199 void SimpleBundleStorage::releaseCustody(const dtn::data::EID&, const dtn::data::BundleID&)
00200 {
00201
00202
00203 }
00204
00205 unsigned int SimpleBundleStorage::count()
00206 {
00207 ibrcommon::MutexLock l(_bundleslock);
00208 return dtn::data::BundleList::size();
00209 }
00210
00211 const std::list<dtn::data::MetaBundle> SimpleBundleStorage::get(BundleFilterCallback &cb)
00212 {
00213
00214 std::list<dtn::data::MetaBundle> result;
00215
00216
00217 ibrcommon::MutexLock l(_bundleslock);
00218
00219 for (std::set<dtn::data::MetaBundle>::const_iterator iter = _priority_index.begin(); (iter != _priority_index.end()) && (result.size() < cb.limit()); iter++)
00220 {
00221 const dtn::data::MetaBundle &meta = (*iter);
00222
00223 if ( cb.shouldAdd(meta) )
00224 {
00225 result.push_back(meta);
00226 }
00227 }
00228
00229 return result;
00230 }
00231
00232 dtn::data::Bundle SimpleBundleStorage::get(const dtn::data::BundleID &id)
00233 {
00234 try {
00235 ibrcommon::MutexLock l(_bundleslock);
00236
00237 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00238 {
00239 const dtn::data::MetaBundle &meta = (*iter);
00240 if (id == meta)
00241 {
00242 return __get(meta);
00243 }
00244 }
00245 } catch (const dtn::SerializationFailedException &ex) {
00246
00247 IBRCOMMON_LOGGER(error) << "Error while loading bundle data: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00248
00249
00250 remove(id);
00251
00252 throw BundleStorage::BundleLoadException();
00253 }
00254
00255 throw BundleStorage::NoBundleFoundException();
00256 }
00257
00258 void SimpleBundleStorage::store(const dtn::data::Bundle &bundle)
00259 {
00260
00261 dtn::data::DefaultSerializer s(std::cout);
00262 size_t bundle_size = s.getLength(bundle);
00263
00264
00265 BundleContainer *bc = new BundleContainer(bundle);
00266 DataStorage::Hash hash(*bc);
00267
00268
00269 {
00270 ibrcommon::MutexLock l(_bundleslock);
00271 if ((_maxsize > 0) && (_currentsize + bundle_size > _maxsize))
00272 {
00273 throw StorageSizeExeededException();
00274 }
00275
00276
00277 dtn::data::MetaBundle meta(bundle);
00278
00279
00280 _pending_bundles[hash] = bundle;
00281
00282
00283 _bundle_size[meta] = bundle_size;
00284 _currentsize += bundle_size;
00285
00286
00287 dtn::data::BundleList::add(meta);
00288 _priority_index.insert(meta);
00289 }
00290
00291
00292 _datastore.store(hash, bc);
00293 }
00294
00295 void SimpleBundleStorage::remove(const dtn::data::BundleID &id)
00296 {
00297 ibrcommon::MutexLock l(_bundleslock);
00298
00299 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00300 {
00301 if ((*iter) == id)
00302 {
00303
00304 dtn::data::MetaBundle meta = (*iter);
00305
00306
00307 dtn::data::BundleList::remove(meta);
00308 _priority_index.erase(meta);
00309
00310 DataStorage::Hash hash(meta.toString());
00311
00312
00313 _datastore.remove(hash);
00314
00315 return;
00316 }
00317 }
00318
00319 throw BundleStorage::NoBundleFoundException();
00320 }
00321
00322 dtn::data::MetaBundle SimpleBundleStorage::remove(const ibrcommon::BloomFilter &filter)
00323 {
00324 ibrcommon::MutexLock l(_bundleslock);
00325
00326 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00327 {
00328
00329 const dtn::data::MetaBundle meta = (*iter);
00330
00331 if ( filter.contains(meta.toString()) )
00332 {
00333
00334 dtn::data::BundleList::remove(meta);
00335 _priority_index.erase(meta);
00336
00337 DataStorage::Hash hash(meta.toString());
00338
00339
00340 _datastore.remove(hash);
00341
00342 return meta;
00343 }
00344 }
00345
00346 throw BundleStorage::NoBundleFoundException();
00347 }
00348
00349 size_t SimpleBundleStorage::size() const
00350 {
00351 return _currentsize;
00352 }
00353
00354 void SimpleBundleStorage::clear()
00355 {
00356 ibrcommon::MutexLock l(_bundleslock);
00357
00358
00359 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00360 {
00361
00362 const dtn::data::MetaBundle meta = (*iter);
00363
00364 DataStorage::Hash hash(meta.toString());
00365
00366
00367 _datastore.remove(hash);
00368 }
00369
00370 _bundles.clear();
00371 _priority_index.clear();
00372 dtn::data::BundleList::clear();
00373
00374
00375 _currentsize = 0;
00376 }
00377
00378 void SimpleBundleStorage::eventBundleExpired(const ExpiringBundle &b)
00379 {
00380 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++)
00381 {
00382 if ((*iter) == b.bundle)
00383 {
00384
00385 const dtn::data::MetaBundle &meta = (*iter);
00386
00387 DataStorage::Hash hash(meta.toString());
00388
00389
00390 _datastore.remove(hash);
00391
00392
00393 _priority_index.erase(meta);
00394
00395 break;
00396 }
00397 }
00398
00399
00400 dtn::core::BundleEvent::raise( b.bundle, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED);
00401
00402
00403 dtn::core::BundleExpiredEvent::raise( b.bundle );
00404 }
00405
00406 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b)
00407 : _bundle(b)
00408 { }
00409
00410 SimpleBundleStorage::BundleContainer::~BundleContainer()
00411 { }
00412
00413 std::string SimpleBundleStorage::BundleContainer::getKey() const
00414 {
00415 return dtn::data::BundleID(_bundle).toString();
00416 }
00417
00418 std::ostream& SimpleBundleStorage::BundleContainer::serialize(std::ostream &stream)
00419 {
00420
00421 dtn::data::DefaultSerializer s(stream);
00422
00423
00424 unsigned int size = s.getLength(_bundle);
00425
00426
00427 s << _bundle; stream.flush();
00428
00429
00430 if (!stream.good())
00431 {
00432 std::stringstream ss; ss << "Output stream went bad [" << std::strerror(errno) << "]";
00433 throw dtn::SerializationFailedException(ss.str());
00434 }
00435
00436
00437 if (size > stream.tellp())
00438 {
00439 std::stringstream ss; ss << "Not all data were written [" << stream.tellp() << " of " << size << " bytes]";
00440 throw dtn::SerializationFailedException(ss.str());
00441 }
00442
00443
00444 return stream;
00445 }
00446 }
00447 }