Go to the documentation of this file.00001 #include "core/SimpleBundleStorage.h"
00002 #include "core/TimeEvent.h"
00003 #include "core/GlobalEvent.h"
00004 #include "core/BundleExpiredEvent.h"
00005 #include "core/BundleEvent.h"
00006
00007 #include <ibrdtn/utils/Utils.h>
00008 #include <ibrcommon/thread/MutexLock.h>
00009 #include <ibrcommon/Logger.h>
00010 #include <ibrcommon/AutoDelete.h>
00011
00012 #include <string.h>
00013 #include <stdlib.h>
00014 #include <iostream>
00015 #include <fstream>
00016
00017 namespace dtn
00018 {
00019 namespace core
00020 {
00021 SimpleBundleStorage::SimpleBundleStorage(size_t maxsize)
00022 : _running(true), _mode(MODE_NONPERSISTENT), _maxsize(maxsize), _currentsize(0)
00023 {
00024 }
00025
00026 SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, size_t maxsize)
00027 : _running(true), _workdir(workdir), _mode(MODE_PERSISTENT), _maxsize(maxsize), _currentsize(0)
00028 {
00029
00030 std::list<ibrcommon::File> files;
00031 _workdir.getFiles(files);
00032
00033 for (std::list<ibrcommon::File>::iterator iter = files.begin(); iter != files.end(); iter++)
00034 {
00035 ibrcommon::File &file = (*iter);
00036 if (!file.isDirectory() && !file.isSystem())
00037 {
00038 try {
00039
00040 load(file);
00041 } catch (ibrcommon::IOException ex) {
00042
00043 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
00044
00045
00046 file.remove();
00047 } catch (dtn::InvalidDataException ex) {
00048
00049 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
00050
00051
00052 file.remove();
00053 }
00054 }
00055 }
00056
00057
00058 IBRCOMMON_LOGGER(info) << _bundles.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL;
00059 }
00060
00061 SimpleBundleStorage::~SimpleBundleStorage()
00062 {
00063 _tasks.abort();
00064 join();
00065 }
00066
00067 void SimpleBundleStorage::componentUp()
00068 {
00069 bindEvent(TimeEvent::className);
00070 }
00071
00072 void SimpleBundleStorage::componentDown()
00073 {
00074 unbindEvent(TimeEvent::className);
00075
00076 _tasks.abort();
00077 }
00078
00079 void SimpleBundleStorage::componentRun()
00080 {
00081 try {
00082 while (true)
00083 {
00084 Task *t = _tasks.getnpop(true);
00085 ibrcommon::AutoDelete<Task> ad(t);
00086
00087 try {
00088 t->run(*this);
00089 } catch (const std::exception &ex) {
00090 IBRCOMMON_LOGGER(error) << "error in storage: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00091 }
00092
00093 yield();
00094 }
00095 } catch (const std::exception &ex) {
00096 IBRCOMMON_LOGGER_DEBUG(10) << "SimpleBundleStorage exited with " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00097 }
00098 }
00099
00100 bool SimpleBundleStorage::__cancellation()
00101 {
00102 _tasks.abort();
00103 return true;
00104 }
00105
00106 void SimpleBundleStorage::raiseEvent(const Event *evt)
00107 {
00108 try {
00109 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt);
00110 if (time.getAction() == dtn::core::TIME_SECOND_TICK)
00111 {
00112 _tasks.push( new SimpleBundleStorage::TaskExpireBundles(time.getTimestamp()) );
00113 }
00114 } catch (const std::bad_cast&) { }
00115 }
00116
00117 const std::string SimpleBundleStorage::getName() const
00118 {
00119 return "SimpleBundleStorage";
00120 }
00121
00122 bool SimpleBundleStorage::empty()
00123 {
00124 ibrcommon::MutexLock l(_bundleslock);
00125 return _bundles.empty();
00126 }
00127
00128 void SimpleBundleStorage::releaseCustody(dtn::data::BundleID&)
00129 {
00130
00131
00132 }
00133
00134 unsigned int SimpleBundleStorage::count()
00135 {
00136 ibrcommon::MutexLock l(_bundleslock);
00137 return _bundles.size();
00138 }
00139
00140 const dtn::data::MetaBundle SimpleBundleStorage::getByFilter(const ibrcommon::BloomFilter &filter)
00141 {
00142
00143
00144 ibrcommon::MutexLock l(_bundleslock);
00145 for (std::set<BundleContainer>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00146 {
00147 const BundleContainer &container = (*iter);
00148
00149 try {
00150 if ( !filter.contains(container.toString()) )
00151 {
00152
00153 if ( (_destination_filter.find(container.destination) == _destination_filter.end()) )
00154 {
00155 IBRCOMMON_LOGGER_DEBUG(10) << container.toString() << " is not in the bloomfilter" << IBRCOMMON_LOGGER_ENDL;
00156 return container;
00157 }
00158 }
00159 } catch (const dtn::InvalidDataException &ex) {
00160 IBRCOMMON_LOGGER_DEBUG(10) << "InvalidDataException on bundle get: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00161 }
00162 }
00163
00164 throw BundleStorage::NoBundleFoundException();
00165 }
00166
00167 const dtn::data::MetaBundle SimpleBundleStorage::getByDestination(const dtn::data::EID &eid, bool exact)
00168 {
00169 IBRCOMMON_LOGGER_DEBUG(5) << "Storage: get bundle for " << eid.getString() << IBRCOMMON_LOGGER_ENDL;
00170
00171 ibrcommon::MutexLock l(_bundleslock);
00172 for (std::set<BundleContainer>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00173 {
00174 const BundleContainer &bundle = (*iter);
00175
00176 try {
00177 if (exact)
00178 {
00179 if (bundle.destination == eid)
00180 {
00181 return bundle;
00182 }
00183 }
00184 else
00185 {
00186 if (bundle.destination.getNodeEID() == eid.getNodeEID())
00187 {
00188 return bundle;
00189 }
00190 }
00191 } catch (const dtn::InvalidDataException &ex) {
00192 IBRCOMMON_LOGGER_DEBUG(10) << "InvalidDataException on bundle get: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00193 }
00194 }
00195
00196 throw BundleStorage::NoBundleFoundException();
00197 }
00198
00199 dtn::data::Bundle SimpleBundleStorage::get(const dtn::data::BundleID &id)
00200 {
00201 ibrcommon::MutexLock l(_bundleslock);
00202 try {
00203 for (std::set<BundleContainer>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00204 {
00205 const BundleContainer &bundle = (*iter);
00206 if (id == bundle)
00207 {
00208 return bundle.get();
00209 }
00210 }
00211 } catch (dtn::SerializationFailedException) {
00212
00213 }
00214
00215 throw BundleStorage::NoBundleFoundException();
00216 }
00217
00218 void SimpleBundleStorage::invokeExpiration(const size_t timestamp)
00219 {
00220 ibrcommon::MutexLock l(_bundleslock);
00221 dtn::data::BundleList::expire(timestamp);
00222 }
00223
00224 void SimpleBundleStorage::load(const ibrcommon::File &file)
00225 {
00226 ibrcommon::MutexLock l(_bundleslock);
00227
00228 BundleContainer container(file);
00229 _bundles.insert( container );
00230
00231
00232 _currentsize += container.size();
00233
00234
00235 dtn::data::BundleList::add(container);
00236 }
00237
00238 void SimpleBundleStorage::store(const dtn::data::Bundle &bundle)
00239 {
00240 ibrcommon::MutexLock l(_bundleslock);
00241
00242
00243 BundleContainer container(bundle);
00244
00245
00246 if ((_maxsize > 0) && (_currentsize + container.size() > _maxsize))
00247 {
00248 throw StorageSizeExeededException();
00249 }
00250
00251
00252 _currentsize += container.size();
00253
00254
00255 if (_mode == MODE_PERSISTENT)
00256 {
00257 container = BundleContainer(bundle, _workdir, container.size());
00258
00259
00260 _tasks.push( new SimpleBundleStorage::TaskStoreBundle(container) );
00261 }
00262
00263
00264 pair<set<BundleContainer>::iterator,bool> ret = _bundles.insert( container );
00265
00266 if (ret.second)
00267 {
00268 dtn::data::BundleList::add(dtn::data::MetaBundle(bundle));
00269 }
00270 else
00271 {
00272 IBRCOMMON_LOGGER_DEBUG(5) << "Storage: got bundle duplicate " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00273 }
00274 }
00275
00276 void SimpleBundleStorage::remove(const dtn::data::BundleID &id)
00277 {
00278 ibrcommon::MutexLock l(_bundleslock);
00279
00280 for (std::set<BundleContainer>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00281 {
00282 if ( id == (*iter) )
00283 {
00284
00285 BundleContainer container = (*iter);
00286
00287
00288 dtn::data::BundleList::remove(container);
00289
00290
00291 _currentsize -= container.size();
00292
00293
00294 _tasks.push( new SimpleBundleStorage::TaskRemoveBundle(container) );
00295
00296
00297 _bundles.erase(iter);
00298
00299 return;
00300 }
00301 }
00302
00303 throw BundleStorage::NoBundleFoundException();
00304 }
00305
00306 dtn::data::MetaBundle SimpleBundleStorage::remove(const ibrcommon::BloomFilter &filter)
00307 {
00308 ibrcommon::MutexLock l(_bundleslock);
00309
00310 for (std::set<BundleContainer>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00311 {
00312 const BundleContainer &container = (*iter);
00313
00314 if ( filter.contains(container.toString()) )
00315 {
00316
00317 BundleContainer container = (*iter);
00318
00319
00320 dtn::data::BundleList::remove(container);
00321
00322
00323 _currentsize -= container.size();
00324
00325
00326 _tasks.push( new SimpleBundleStorage::TaskRemoveBundle(container) );
00327
00328
00329 _bundles.erase(iter);
00330
00331 return (MetaBundle)container;
00332 }
00333 }
00334
00335 throw BundleStorage::NoBundleFoundException();
00336 }
00337
00338 size_t SimpleBundleStorage::size() const
00339 {
00340 return _currentsize;
00341 }
00342
00343 void SimpleBundleStorage::clear()
00344 {
00345 ibrcommon::MutexLock l(_bundleslock);
00346
00347
00348 for (std::set<BundleContainer>::iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00349 {
00350 BundleContainer container = (*iter);
00351
00352
00353 container.remove();
00354 }
00355
00356 _bundles.clear();
00357 dtn::data::BundleList::clear();
00358
00359
00360 _currentsize = 0;
00361 }
00362
00363 void SimpleBundleStorage::eventBundleExpired(const ExpiringBundle &b)
00364 {
00365 for (std::set<BundleContainer>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00366 {
00367 if ( b.bundle == (*iter) )
00368 {
00369 BundleContainer container = (*iter);
00370
00371
00372 _currentsize -= container.size();
00373
00374 container.remove();
00375 _bundles.erase(iter);
00376 break;
00377 }
00378 }
00379
00380
00381 dtn::core::BundleEvent::raise( b.bundle, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED);
00382
00383
00384 dtn::core::BundleExpiredEvent::raise( b.bundle );
00385 }
00386
00387 const std::list<dtn::data::BundleID> SimpleBundleStorage::getList()
00388 {
00389 ibrcommon::MutexLock l(_bundleslock);
00390 std::list<dtn::data::BundleID> ret;
00391
00392 for (std::set<BundleContainer>::const_iterator iter = _bundles.begin(); iter != _bundles.end(); iter++)
00393 {
00394 const BundleContainer &container = (*iter);
00395 ret.push_back( container );
00396 }
00397
00398 return ret;
00399 }
00400
00401 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b)
00402 : dtn::data::MetaBundle(b), _holder( new SimpleBundleStorage::BundleContainer::Holder(b) )
00403 {
00404 }
00405
00406 SimpleBundleStorage::BundleContainer::BundleContainer(const ibrcommon::File &file)
00407 : dtn::data::MetaBundle(), _holder( new SimpleBundleStorage::BundleContainer::Holder(file) )
00408 {
00409 dtn::data::Bundle b = _holder->getBundle();
00410 (dtn::data::MetaBundle&)(*this) = dtn::data::MetaBundle(b);
00411 }
00412
00413 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b, const ibrcommon::File &workdir, const size_t size)
00414 : dtn::data::MetaBundle(b), _holder( new SimpleBundleStorage::BundleContainer::Holder(b, workdir, size) )
00415 {
00416 }
00417
00418 SimpleBundleStorage::BundleContainer::~BundleContainer()
00419 {
00420 down();
00421 }
00422
00423 void SimpleBundleStorage::BundleContainer::down()
00424 {
00425 try {
00426 ibrcommon::MutexLock l(_holder->lock);
00427 if (--_holder->_count == 0) throw true;
00428 } catch (bool) {
00429 delete _holder;
00430 }
00431 }
00432
00433 size_t SimpleBundleStorage::BundleContainer::size() const
00434 {
00435 return _holder->size();
00436 }
00437
00438 bool SimpleBundleStorage::BundleContainer::operator<(const SimpleBundleStorage::BundleContainer& other) const
00439 {
00440 MetaBundle &left = (MetaBundle&)*this;
00441 MetaBundle &right = (MetaBundle&)other;
00442
00443 if (left.getPriority() > right.getPriority()) return true;
00444 if (left.getPriority() != right.getPriority()) return false;
00445 return (left < right);
00446 }
00447
00448 SimpleBundleStorage::BundleContainer& SimpleBundleStorage::BundleContainer::operator= (const SimpleBundleStorage::BundleContainer &right)
00449 {
00450 ibrcommon::MutexLock l(right._holder->lock);
00451 ++right._holder->_count;
00452
00453 down();
00454
00455 _holder = right._holder;
00456 return *this;
00457 }
00458
00459 SimpleBundleStorage::BundleContainer::BundleContainer(const SimpleBundleStorage::BundleContainer& right)
00460 : dtn::data::MetaBundle(right), _holder(right._holder)
00461 {
00462 ibrcommon::MutexLock l(_holder->lock);
00463 ++_holder->_count;
00464 }
00465
00466 dtn::data::Bundle SimpleBundleStorage::BundleContainer::get() const
00467 {
00468 return _holder->getBundle();
00469 }
00470
00471 void SimpleBundleStorage::BundleContainer::remove()
00472 {
00473 _holder->remove();
00474 }
00475
00476 void SimpleBundleStorage::BundleContainer::invokeStore()
00477 {
00478 _holder->invokeStore();
00479 }
00480
00481 SimpleBundleStorage::BundleContainer::Holder::Holder( const dtn::data::Bundle &b )
00482 : _count(1), _state(HOLDER_MEMORY), _bundle(b)
00483 {
00484 dtn::data::DefaultSerializer s(std::cout);
00485 _size = s.getLength(_bundle);
00486 }
00487
00488 SimpleBundleStorage::BundleContainer::Holder::Holder( const ibrcommon::File &file )
00489 : _count(1), _state(HOLDER_STORED), _bundle(), _container(file)
00490 {
00491 _size = file.size();
00492 }
00493
00494 SimpleBundleStorage::BundleContainer::Holder::Holder( const dtn::data::Bundle &b, const ibrcommon::File &workdir, const size_t size )
00495 : _count(1), _state(HOLDER_PENDING), _bundle(b), _container(workdir), _size(size)
00496 {
00497 }
00498
00499 SimpleBundleStorage::BundleContainer::Holder::~Holder()
00500 {
00501 ibrcommon::MutexLock l(_state_lock);
00502 if ((_state == HOLDER_DELETED) && (_container.exists()))
00503 {
00504 _container.remove();
00505 }
00506 }
00507
00508 size_t SimpleBundleStorage::BundleContainer::Holder::size() const
00509 {
00510 return _size;
00511 }
00512
00513 dtn::data::Bundle SimpleBundleStorage::BundleContainer::Holder::getBundle()
00514 {
00515 ibrcommon::MutexLock l(_state_lock);
00516 if (_state == HOLDER_DELETED)
00517 {
00518 throw dtn::SerializationFailedException("bundle deleted");
00519 }
00520 else if (_state == HOLDER_STORED)
00521 {
00522 dtn::data::Bundle bundle;
00523
00524
00525 ibrcommon::locked_ifstream ostream(_container, ios::in|ios::binary);
00526 std::istream &fs = (*ostream);
00527 try {
00528 fs.exceptions(std::ios::badbit | std::ios::eofbit);
00529 dtn::data::DefaultDeserializer(fs, dtn::core::BundleCore::getInstance()) >> bundle;
00530 } catch (ios_base::failure ex) {
00531 throw dtn::SerializationFailedException("can not load bundle data (" + std::string(ex.what()) + ")");
00532 } catch (const std::exception &ex) {
00533 throw dtn::SerializationFailedException("bundle get failed: " + std::string(ex.what()));
00534 }
00535
00536 ostream.close();
00537
00538 return bundle;
00539 }
00540 else
00541 {
00542 return _bundle;
00543 }
00544 }
00545
00546 void SimpleBundleStorage::BundleContainer::Holder::remove()
00547 {
00548 ibrcommon::MutexLock l(_state_lock);
00549 _state = HOLDER_DELETED;
00550 }
00551
00552 void SimpleBundleStorage::BundleContainer::Holder::invokeStore()
00553 {
00554 ibrcommon::MutexLock l(_state_lock);
00555 try {
00556 if (_state == HOLDER_PENDING)
00557 {
00558 _container = ibrcommon::TemporaryFile(_container, "bundle");
00559 ibrcommon::locked_ofstream ostream(_container);
00560 std::ostream &out = (*ostream);
00561
00562 if (!ostream.is_open()) throw ibrcommon::IOException("can not open file");
00563 dtn::data::DefaultSerializer(out) << _bundle; out << std::flush;
00564 ostream.close();
00565
00566 if (_container.size() == 0)
00567 {
00568 throw ibrcommon::IOException("Write of bundle failed. File has size of zero.");
00569 }
00570
00571 _size = _container.size();
00572 _bundle = dtn::data::Bundle();
00573 _state = HOLDER_STORED;
00574 }
00575 } catch (const std::exception &ex) {
00576 throw dtn::SerializationFailedException("can not write data to the storage; " + std::string(ex.what()));
00577 }
00578 }
00579
00580 SimpleBundleStorage::TaskStoreBundle::TaskStoreBundle(const SimpleBundleStorage::BundleContainer &container)
00581 : _container(container)
00582 { }
00583
00584 SimpleBundleStorage::TaskStoreBundle::~TaskStoreBundle()
00585 { }
00586
00587 void SimpleBundleStorage::TaskStoreBundle::run(SimpleBundleStorage &storage)
00588 {
00589 try {
00590 _container.invokeStore();
00591 IBRCOMMON_LOGGER_DEBUG(20) << "bundle stored " << _container.toString() << " (size: " << _container.size() << ")" << IBRCOMMON_LOGGER_ENDL;
00592 } catch (const dtn::SerializationFailedException &ex) {
00593 IBRCOMMON_LOGGER(error) << "failed to store bundle " << _container.toString() << " (size: " << _container.size() << "): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00594 } catch (const ibrcommon::IOException &ex) {
00595 IBRCOMMON_LOGGER(error) << "failed to store bundle " << _container.toString() << " (size: " << _container.size() << "): " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00596
00597 storage._tasks.push( new SimpleBundleStorage::TaskStoreBundle(_container) );
00598 }
00599 }
00600
00601 SimpleBundleStorage::TaskRemoveBundle::TaskRemoveBundle(const SimpleBundleStorage::BundleContainer &container)
00602 : _container(container)
00603 { }
00604
00605 SimpleBundleStorage::TaskRemoveBundle::~TaskRemoveBundle()
00606 { }
00607
00608 void SimpleBundleStorage::TaskRemoveBundle::run(SimpleBundleStorage&)
00609 {
00610
00611 _container.remove();
00612 }
00613
00614 SimpleBundleStorage::TaskExpireBundles::TaskExpireBundles(const size_t ×tamp)
00615 : _timestamp(timestamp)
00616 { }
00617
00618 SimpleBundleStorage::TaskExpireBundles::~TaskExpireBundles()
00619 { }
00620
00621 void SimpleBundleStorage::TaskExpireBundles::run(SimpleBundleStorage &storage)
00622 {
00623 storage.invokeExpiration(_timestamp);
00624 }
00625 }
00626 }