00001 #include "core/SimpleBundleStorage.h"
00002 #include "core/TimeEvent.h"
00003 #include "core/GlobalEvent.h"
00004 #include "core/BundleExpiredEvent.h"
00005 #include "core/BundleEvent.h"
00006
00007 #include <ibrdtn/utils/Utils.h>
00008 #include <ibrcommon/thread/MutexLock.h>
00009 #include <ibrcommon/Logger.h>
00010 #include <ibrcommon/AutoDelete.h>
00011
00012 #include <string.h>
00013 #include <stdlib.h>
00014 #include <iostream>
00015 #include <fstream>
00016
00017 namespace dtn
00018 {
00019 namespace core
00020 {
00021 SimpleBundleStorage::SimpleBundleStorage(size_t maxsize)
00022 : _store(maxsize), _running(true)
00023 {
00024 bindEvent(TimeEvent::className);
00025 bindEvent(GlobalEvent::className);
00026 }
00027
00028 SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, size_t maxsize)
00029 : _store(workdir, maxsize), _running(true)
00030 {
00031 }
00032
00033 SimpleBundleStorage::~SimpleBundleStorage()
00034 {
00035 }
00036
00037 void SimpleBundleStorage::componentUp()
00038 {
00039 bindEvent(TimeEvent::className);
00040 }
00041
00042 void SimpleBundleStorage::componentDown()
00043 {
00044 unbindEvent(TimeEvent::className);
00045
00046 _tasks.unblock();
00047 }
00048
00049 void SimpleBundleStorage::componentRun()
00050 {
00051 try {
00052 while (isRunning())
00053 {
00054 Task *t = _tasks.blockingpop();
00055 ibrcommon::AutoDelete<Task> ad(t);
00056
00057 try {
00058 t->run();
00059 } catch (...) {
00060
00061 }
00062 }
00063 } catch (ibrcommon::Exception) {
00064
00065 }
00066 }
00067
00068 void SimpleBundleStorage::raiseEvent(const Event *evt)
00069 {
00070 const TimeEvent *time = dynamic_cast<const TimeEvent*>(evt);
00071
00072 if (time != NULL)
00073 {
00074 if (time->getAction() == dtn::core::TIME_SECOND_TICK)
00075 {
00076 _store.expire(time->getTimestamp());
00077 }
00078 }
00079 }
00080
00081 void SimpleBundleStorage::clear()
00082 {
00083
00084 _store.clear();
00085 }
00086
00087 bool SimpleBundleStorage::empty()
00088 {
00089 ibrcommon::MutexLock l(_store.bundleslock);
00090 return _store.bundles.empty();
00091 }
00092
00093 unsigned int SimpleBundleStorage::count()
00094 {
00095 return _store.count();
00096 }
00097
00098 size_t SimpleBundleStorage::size() const
00099 {
00100 return _store.size();
00101 }
00102
00103 void SimpleBundleStorage::releaseCustody(dtn::data::BundleID&)
00104 {
00105
00106
00107 }
00108
00109 unsigned int SimpleBundleStorage::BundleStore::count()
00110 {
00111 ibrcommon::MutexLock l(bundleslock);
00112 return bundles.size();
00113 }
00114
00115 void SimpleBundleStorage::store(const dtn::data::Bundle &bundle)
00116 {
00117 _store.store(bundle, *this);
00118 IBRCOMMON_LOGGER_DEBUG(5) << "Storage: stored bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00119 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_STORED);
00120 }
00121
00122 dtn::data::Bundle SimpleBundleStorage::get(const dtn::data::EID &eid)
00123 {
00124 IBRCOMMON_LOGGER_DEBUG(5) << "Storage: get bundle for " << eid.getString() << IBRCOMMON_LOGGER_ENDL;
00125 return _store.get(eid);
00126 }
00127
00128 dtn::data::Bundle SimpleBundleStorage::get(const dtn::data::BundleID &id)
00129 {
00130 return _store.get(id);
00131 }
00132
00133 dtn::data::Bundle SimpleBundleStorage::get(const ibrcommon::BloomFilter &filter)
00134 {
00135 return _store.get(filter);
00136 }
00137
00138 const std::list<dtn::data::BundleID> SimpleBundleStorage::getList()
00139 {
00140 return _store.getList();
00141 }
00142
00143 void SimpleBundleStorage::remove(const dtn::data::BundleID &id)
00144 {
00145 _store.remove(id);
00146 }
00147
00148 SimpleBundleStorage::BundleStore::BundleStore(size_t maxsize)
00149 : _mode(MODE_NONPERSISTENT), _maxsize(maxsize), _currentsize(0)
00150 {
00151 }
00152
00153 SimpleBundleStorage::BundleStore::BundleStore(ibrcommon::File workdir, size_t maxsize)
00154 : _workdir(workdir), _mode(MODE_PERSISTENT), _maxsize(maxsize), _currentsize(0)
00155 {
00156
00157 std::list<ibrcommon::File> files;
00158 _workdir.getFiles(files);
00159
00160 for (std::list<ibrcommon::File>::iterator iter = files.begin(); iter != files.end(); iter++)
00161 {
00162 ibrcommon::File &file = (*iter);
00163 if (!file.isDirectory() && !file.isSystem())
00164 {
00165 try {
00166
00167 load(file);
00168 } catch (ibrcommon::IOException ex) {
00169
00170 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
00171
00172
00173 file.remove();
00174 } catch (dtn::InvalidDataException ex) {
00175
00176 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
00177
00178
00179 file.remove();
00180 }
00181 }
00182 }
00183
00184
00185 IBRCOMMON_LOGGER(info) << bundles.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL;
00186 }
00187
00188 SimpleBundleStorage::BundleStore::~BundleStore()
00189 {
00190 }
00191
00192 dtn::data::Bundle SimpleBundleStorage::BundleStore::get(const ibrcommon::BloomFilter &filter)
00193 {
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206
00207 ibrcommon::MutexLock l(bundleslock);
00208 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00209 {
00210 const BundleContainer &bundle = (*iter);
00211 const BundleID id(*bundle);
00212
00213 if (!filter.contains(id.toString()))
00214 {
00215 IBRCOMMON_LOGGER_DEBUG(10) << id.toString() << " is not in the bloomfilter" << IBRCOMMON_LOGGER_ENDL;
00216 return (*bundle);
00217 }
00218 }
00219
00220 throw BundleStorage::NoBundleFoundException();
00221 }
00222
00223 dtn::data::Bundle SimpleBundleStorage::BundleStore::get(const dtn::data::EID &eid)
00224 {
00225 ibrcommon::MutexLock l(bundleslock);
00226 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00227 {
00228 const BundleContainer &bundle = (*iter);
00229 if ((*bundle)._destination == eid)
00230 {
00231 return (*bundle);
00232 }
00233 }
00234
00235 throw BundleStorage::NoBundleFoundException();
00236 }
00237
00238 dtn::data::Bundle SimpleBundleStorage::BundleStore::get(const dtn::data::BundleID &id)
00239 {
00240 ibrcommon::MutexLock l(bundleslock);
00241
00242 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00243 {
00244 const BundleContainer &bundle = (*iter);
00245 if (id == (*bundle))
00246 {
00247 return (*bundle);
00248 }
00249 }
00250
00251 throw dtn::core::BundleStorage::NoBundleFoundException();
00252 }
00253
00254 void SimpleBundleStorage::BundleStore::expire(const size_t timestamp)
00255 {
00256 ibrcommon::MutexLock l(bundleslock);
00257 dtn::data::BundleList::expire(timestamp);
00258 }
00259
00260 void SimpleBundleStorage::BundleStore::load(const ibrcommon::File &file)
00261 {
00262 ibrcommon::MutexLock l(bundleslock);
00263
00264 BundleContainer container(file);
00265 bundles.insert( container );
00266
00267
00268 _currentsize += container.size();
00269
00270
00271 dtn::data::BundleList::add(dtn::data::MetaBundle(*container));
00272 }
00273
00274 void SimpleBundleStorage::BundleStore::store(const dtn::data::Bundle &bundle, SimpleBundleStorage &storage)
00275 {
00276 ibrcommon::MutexLock l(bundleslock);
00277
00278 pair<set<BundleContainer>::iterator,bool> ret;
00279
00280
00281 BundleContainer container(bundle);
00282
00283
00284 if ((_maxsize > 0) && (_currentsize + container.size() > _maxsize))
00285 {
00286 throw StorageSizeExeededException();
00287 }
00288
00289
00290 _currentsize += container.size();
00291
00292
00293 if (_mode == MODE_PERSISTENT)
00294 {
00295 container = BundleContainer(bundle, _workdir, container.size());
00296
00297
00298 storage._tasks.push( new SimpleBundleStorage::TaskStoreBundle(container) );
00299 }
00300
00301
00302 ret = bundles.insert( container );
00303
00304 if (ret.second)
00305 {
00306 dtn::data::BundleList::add(dtn::data::MetaBundle(bundle));
00307 }
00308 else
00309 {
00310 IBRCOMMON_LOGGER_DEBUG(5) << "Storage: got bundle duplicate " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00311 }
00312 }
00313
00314 void SimpleBundleStorage::BundleStore::remove(const dtn::data::BundleID &id)
00315 {
00316 ibrcommon::MutexLock l(bundleslock);
00317
00318 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00319 {
00320 if ( id == (*(*iter)) )
00321 {
00322
00323 BundleContainer container = (*iter);
00324
00325
00326 dtn::data::BundleList::remove(dtn::data::MetaBundle(*container));
00327
00328
00329 _currentsize -= container.size();
00330
00331
00332 container.remove();
00333
00334
00335 bundles.erase(iter);
00336
00337 return;
00338 }
00339 }
00340
00341 throw BundleStorage::NoBundleFoundException();
00342 }
00343
00344 size_t SimpleBundleStorage::BundleStore::size() const
00345 {
00346 return _currentsize;
00347 }
00348
00349 void SimpleBundleStorage::BundleStore::clear()
00350 {
00351 ibrcommon::MutexLock l(bundleslock);
00352
00353
00354 for (std::set<BundleContainer>::iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00355 {
00356 BundleContainer container = (*iter);
00357
00358
00359 container.remove();
00360 }
00361
00362 bundles.clear();
00363 dtn::data::BundleList::clear();
00364
00365
00366 _currentsize = 0;
00367 }
00368
00369 void SimpleBundleStorage::BundleStore::eventBundleExpired(const ExpiringBundle &b)
00370 {
00371 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00372 {
00373 if ( b.bundle == (*(*iter)) )
00374 {
00375 BundleContainer container = (*iter);
00376
00377
00378 _currentsize -= container.size();
00379
00380 container.remove();
00381 bundles.erase(iter);
00382 break;
00383 }
00384 }
00385
00386
00387 dtn::core::BundleEvent::raise( b.bundle, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED);
00388
00389
00390 dtn::core::BundleExpiredEvent::raise( b.bundle );
00391 }
00392
00393 const std::list<dtn::data::BundleID> SimpleBundleStorage::BundleStore::getList()
00394 {
00395 ibrcommon::MutexLock l(bundleslock);
00396 std::list<dtn::data::BundleID> ret;
00397
00398 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00399 {
00400 const BundleContainer &container = (*iter);
00401 ret.push_back( dtn::data::BundleID(*container) );
00402 }
00403
00404 return ret;
00405 }
00406
00407 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b)
00408 : _holder( new SimpleBundleStorage::BundleContainer::Holder(b) )
00409 {
00410 }
00411
00412 SimpleBundleStorage::BundleContainer::BundleContainer(const ibrcommon::File &file)
00413 : _holder( new SimpleBundleStorage::BundleContainer::Holder(file) )
00414 {
00415 }
00416
00417 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b, const ibrcommon::File &workdir, const size_t size)
00418 : _holder( new SimpleBundleStorage::BundleContainer::Holder(b, workdir, size) )
00419 {
00420 }
00421
00422 SimpleBundleStorage::BundleContainer::~BundleContainer()
00423 {
00424 if (--_holder->_count == 0) delete _holder;
00425 }
00426
00427 size_t SimpleBundleStorage::BundleContainer::size() const
00428 {
00429 return _holder->size();
00430 }
00431
00432 bool SimpleBundleStorage::BundleContainer::operator<(const SimpleBundleStorage::BundleContainer& other) const
00433 {
00434 return (*(*this) < *other);
00435 }
00436
00437 bool SimpleBundleStorage::BundleContainer::operator>(const SimpleBundleStorage::BundleContainer& other) const
00438 {
00439 return (*(*this) > *other);
00440 }
00441
00442 bool SimpleBundleStorage::BundleContainer::operator!=(const SimpleBundleStorage::BundleContainer& other) const
00443 {
00444 return !((*this) == other);
00445 }
00446
00447 bool SimpleBundleStorage::BundleContainer::operator==(const SimpleBundleStorage::BundleContainer& other) const
00448 {
00449 return (*(*this) == *other);
00450 }
00451
00452 SimpleBundleStorage::BundleContainer& SimpleBundleStorage::BundleContainer::operator= (const SimpleBundleStorage::BundleContainer &right)
00453 {
00454 ++right._holder->_count;
00455 if (--_holder->_count == 0) delete _holder;
00456 _holder = right._holder;
00457 return *this;
00458 }
00459
00460 SimpleBundleStorage::BundleContainer& SimpleBundleStorage::BundleContainer::operator= (SimpleBundleStorage::BundleContainer &right)
00461 {
00462 ++right._holder->_count;
00463 if (--_holder->_count == 0) delete _holder;
00464 _holder = right._holder;
00465 return *this;
00466 }
00467
00468 SimpleBundleStorage::BundleContainer::BundleContainer(const SimpleBundleStorage::BundleContainer& right)
00469 : BundleID(right), _holder(right._holder)
00470 {
00471 ++_holder->_count;
00472 }
00473
00474 dtn::data::Bundle& SimpleBundleStorage::BundleContainer::operator*()
00475 {
00476 return _holder->_bundle;
00477 }
00478
00479 const dtn::data::Bundle& SimpleBundleStorage::BundleContainer::operator*() const
00480 {
00481 return _holder->_bundle;
00482 }
00483
00484 void SimpleBundleStorage::BundleContainer::remove()
00485 {
00486 _holder->deletion = true;
00487 }
00488
00489 void SimpleBundleStorage::BundleContainer::invokeStore()
00490 {
00491 _holder->invokeStore();
00492 }
00493
00494 SimpleBundleStorage::BundleContainer::Holder::Holder( const dtn::data::Bundle &b )
00495 : _bundle(b), _mode(MODE_NONPERSISTENT), _count(1), deletion(false)
00496 {
00497 dtn::data::DefaultSerializer s(std::cout);
00498 _size = s.getLength(_bundle);
00499 }
00500
00501 SimpleBundleStorage::BundleContainer::Holder::Holder( const ibrcommon::File &file )
00502 : _bundle(), _container(file), _mode(MODE_PERSISTENT), _count(1), deletion(false)
00503 {
00504 std::fstream fs(file.getPath().c_str(), ios::in|ios::binary);
00505 _size = file.size();
00506
00507 try {
00508 fs.exceptions(std::ios::badbit | std::ios::failbit | std::ios::eofbit);
00509 dtn::data::DefaultDeserializer(fs, dtn::core::BundleCore::getInstance()) >> _bundle;
00510 } catch (ios_base::failure ex) {
00511 throw dtn::SerializationFailedException("can not load bundle data" + std::string(ex.what()));
00512 }
00513
00514 fs.close();
00515 }
00516
00517 SimpleBundleStorage::BundleContainer::Holder::Holder( const dtn::data::Bundle &b, const ibrcommon::File &workdir, const size_t size )
00518 : _bundle(b), _mode(MODE_PERSISTENT), _count(1), deletion(false), _size(size)
00519 {
00520 std::string namestr = workdir.getPath() + "/bundle-XXXXXXXX";
00521 char name[namestr.length()];
00522 ::strcpy(name, namestr.c_str());
00523
00524 int fd = mkstemp(name);
00525 if (fd == -1) throw ibrcommon::IOException("Could not create a temporary name.");
00526 ::close(fd);
00527
00528 _container = ibrcommon::File(name);
00529 }
00530
00531 SimpleBundleStorage::BundleContainer::Holder::~Holder()
00532 {
00533 if (_mode == MODE_PERSISTENT && deletion)
00534 {
00535 _container.remove();
00536 }
00537 }
00538
00539 size_t SimpleBundleStorage::BundleContainer::Holder::size() const
00540 {
00541 return _size;
00542 }
00543
00544 void SimpleBundleStorage::BundleContainer::Holder::invokeStore()
00545 {
00546 try {
00547 std::fstream out(_container.getPath().c_str(), ios::in|ios::out|ios::binary|ios::trunc);
00548 out.exceptions(std::ios::badbit | std::ios::failbit | std::ios::eofbit);
00549 dtn::data::DefaultSerializer(out) << _bundle; out << std::flush;
00550 out.close();
00551 _size = _container.size();
00552 } catch (ios_base::failure ex) {
00553 throw dtn::SerializationFailedException("can not write data to the storage; " + std::string(ex.what()));
00554 }
00555 }
00556
00557 SimpleBundleStorage::TaskStoreBundle::TaskStoreBundle(const SimpleBundleStorage::BundleContainer &container)
00558 : _container(container)
00559 {
00560 }
00561
00562 SimpleBundleStorage::TaskStoreBundle::~TaskStoreBundle()
00563 {
00564
00565 }
00566
00567 void SimpleBundleStorage::TaskStoreBundle::run()
00568 {
00569 _container.invokeStore();
00570 }
00571 }
00572 }