00001 #include "core/SimpleBundleStorage.h"
00002 #include "core/TimeEvent.h"
00003 #include "core/GlobalEvent.h"
00004 #include "core/BundleExpiredEvent.h"
00005 #include "core/BundleEvent.h"
00006
00007 #include <ibrdtn/utils/Utils.h>
00008 #include <ibrcommon/thread/MutexLock.h>
00009 #include <ibrcommon/Logger.h>
00010 #include <ibrcommon/AutoDelete.h>
00011
00012 #include <string.h>
00013 #include <stdlib.h>
00014 #include <iostream>
00015 #include <fstream>
00016
00017 namespace dtn
00018 {
00019 namespace core
00020 {
00021 SimpleBundleStorage::SimpleBundleStorage(size_t maxsize)
00022 : _store(maxsize), _running(true)
00023 {
00024 bindEvent(TimeEvent::className);
00025 bindEvent(GlobalEvent::className);
00026 }
00027
00028 SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, size_t maxsize)
00029 : _store(workdir, maxsize), _running(true)
00030 {
00031 }
00032
00033 SimpleBundleStorage::~SimpleBundleStorage()
00034 {
00035 }
00036
00037 void SimpleBundleStorage::componentUp()
00038 {
00039 bindEvent(TimeEvent::className);
00040 }
00041
00042 void SimpleBundleStorage::componentDown()
00043 {
00044 unbindEvent(TimeEvent::className);
00045
00046 _tasks.unblock();
00047 }
00048
00049 void SimpleBundleStorage::componentRun()
00050 {
00051 try {
00052 while (isRunning())
00053 {
00054 Task *t = _tasks.blockingpop();
00055 ibrcommon::AutoDelete<Task> ad(t);
00056
00057 try {
00058 t->run();
00059 } catch (...) {
00060
00061 }
00062 }
00063 } catch (ibrcommon::Exception) {
00064
00065 }
00066 }
00067
00068 void SimpleBundleStorage::raiseEvent(const Event *evt)
00069 {
00070 const TimeEvent *time = dynamic_cast<const TimeEvent*>(evt);
00071
00072 if (time != NULL)
00073 {
00074 if (time->getAction() == dtn::core::TIME_SECOND_TICK)
00075 {
00076 _store.expire(time->getTimestamp());
00077 }
00078 }
00079 }
00080
00081 void SimpleBundleStorage::clear()
00082 {
00083
00084 _store.clear();
00085 }
00086
00087 bool SimpleBundleStorage::empty()
00088 {
00089 ibrcommon::MutexLock l(_store.bundleslock);
00090 return _store.bundles.empty();
00091 }
00092
00093 unsigned int SimpleBundleStorage::count()
00094 {
00095 return _store.count();
00096 }
00097
00098 size_t SimpleBundleStorage::size() const
00099 {
00100 return _store.size();
00101 }
00102
00103 void SimpleBundleStorage::releaseCustody(dtn::data::BundleID&)
00104 {
00105
00106
00107 }
00108
00109 unsigned int SimpleBundleStorage::BundleStore::count()
00110 {
00111 ibrcommon::MutexLock l(bundleslock);
00112 return bundles.size();
00113 }
00114
00115 void SimpleBundleStorage::store(const dtn::data::Bundle &bundle)
00116 {
00117 _store.store(bundle, *this);
00118 IBRCOMMON_LOGGER_DEBUG(5) << "Storage: stored bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00119 dtn::core::BundleEvent::raise(bundle, dtn::core::BUNDLE_STORED);
00120 }
00121
00122 dtn::data::Bundle SimpleBundleStorage::get(const dtn::data::EID &eid)
00123 {
00124 IBRCOMMON_LOGGER_DEBUG(5) << "Storage: get bundle for " << eid.getString() << IBRCOMMON_LOGGER_ENDL;
00125 return _store.get(eid);
00126 }
00127
00128 dtn::data::Bundle SimpleBundleStorage::get(const dtn::data::BundleID &id)
00129 {
00130 return _store.get(id);
00131 }
00132
00133 dtn::data::Bundle SimpleBundleStorage::get(const ibrcommon::BloomFilter &filter)
00134 {
00135 return _store.get(filter);
00136 }
00137
00138 const std::list<dtn::data::BundleID> SimpleBundleStorage::getList()
00139 {
00140 return _store.getList();
00141 }
00142
00143 void SimpleBundleStorage::remove(const dtn::data::BundleID &id)
00144 {
00145 _store.remove(id);
00146 }
00147
00148 SimpleBundleStorage::BundleStore::BundleStore(size_t maxsize)
00149 : _mode(MODE_NONPERSISTENT), _maxsize(maxsize), _currentsize(0)
00150 {
00151 }
00152
00153 SimpleBundleStorage::BundleStore::BundleStore(ibrcommon::File workdir, size_t maxsize)
00154 : _workdir(workdir), _mode(MODE_PERSISTENT), _maxsize(maxsize), _currentsize(0)
00155 {
00156
00157 std::list<ibrcommon::File> files;
00158 _workdir.getFiles(files);
00159
00160 for (std::list<ibrcommon::File>::iterator iter = files.begin(); iter != files.end(); iter++)
00161 {
00162 ibrcommon::File &file = (*iter);
00163 if (!file.isDirectory() && !file.isSystem())
00164 {
00165 try {
00166
00167 load(file);
00168 } catch (ibrcommon::IOException ex) {
00169
00170 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
00171
00172
00173 file.remove();
00174 } catch (dtn::InvalidDataException ex) {
00175
00176 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
00177
00178
00179 file.remove();
00180 }
00181 }
00182 }
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193 IBRCOMMON_LOGGER(info) << bundles.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL;
00194 }
00195
00196 SimpleBundleStorage::BundleStore::~BundleStore()
00197 {
00198 }
00199
00200 dtn::data::Bundle SimpleBundleStorage::BundleStore::get(const ibrcommon::BloomFilter &filter)
00201 {
00202
00203
00204 ibrcommon::MutexLock l(bundleslock);
00205 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00206 {
00207 const BundleContainer &container = (*iter);
00208
00209 if (!filter.contains(container.toString()))
00210 {
00211 IBRCOMMON_LOGGER_DEBUG(10) << container.toString() << " is not in the bloomfilter" << IBRCOMMON_LOGGER_ENDL;
00212 return container.get();
00213 }
00214 }
00215
00216 throw BundleStorage::NoBundleFoundException();
00217 }
00218
00219 dtn::data::Bundle SimpleBundleStorage::BundleStore::get(const dtn::data::EID &eid)
00220 {
00221 ibrcommon::MutexLock l(bundleslock);
00222 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00223 {
00224 const BundleContainer &bundle = (*iter);
00225 if (bundle.destination == eid)
00226 {
00227 return bundle.get();
00228 }
00229 }
00230
00231 throw BundleStorage::NoBundleFoundException();
00232 }
00233
00234 dtn::data::Bundle SimpleBundleStorage::BundleStore::get(const dtn::data::BundleID &id)
00235 {
00236 ibrcommon::MutexLock l(bundleslock);
00237 try {
00238 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00239 {
00240 const BundleContainer &bundle = (*iter);
00241 if (id == bundle)
00242 {
00243 return bundle.get();
00244 }
00245 }
00246 } catch (dtn::SerializationFailedException) {
00247
00248 }
00249
00250 throw dtn::core::BundleStorage::NoBundleFoundException();
00251 }
00252
00253 void SimpleBundleStorage::BundleStore::expire(const size_t timestamp)
00254 {
00255 ibrcommon::MutexLock l(bundleslock);
00256 dtn::data::BundleList::expire(timestamp);
00257 }
00258
00259 void SimpleBundleStorage::BundleStore::load(const ibrcommon::File &file)
00260 {
00261 ibrcommon::MutexLock l(bundleslock);
00262
00263 BundleContainer container(file);
00264 bundles.insert( container );
00265
00266
00267 _currentsize += container.size();
00268
00269
00270 dtn::data::BundleList::add(container);
00271 }
00272
00273 void SimpleBundleStorage::BundleStore::store(const dtn::data::Bundle &bundle, SimpleBundleStorage &storage)
00274 {
00275 ibrcommon::MutexLock l(bundleslock);
00276
00277 pair<set<BundleContainer>::iterator,bool> ret;
00278
00279
00280 BundleContainer container(bundle);
00281
00282
00283 if ((_maxsize > 0) && (_currentsize + container.size() > _maxsize))
00284 {
00285 throw StorageSizeExeededException();
00286 }
00287
00288
00289 _currentsize += container.size();
00290
00291
00292 if (_mode == MODE_PERSISTENT)
00293 {
00294 container = BundleContainer(bundle, _workdir, container.size());
00295
00296
00297 storage._tasks.push( new SimpleBundleStorage::TaskStoreBundle(container) );
00298 }
00299
00300
00301 ret = bundles.insert( container );
00302
00303 if (ret.second)
00304 {
00305 dtn::data::BundleList::add(dtn::data::MetaBundle(bundle));
00306 }
00307 else
00308 {
00309 IBRCOMMON_LOGGER_DEBUG(5) << "Storage: got bundle duplicate " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00310 }
00311 }
00312
00313 void SimpleBundleStorage::BundleStore::remove(const dtn::data::BundleID &id)
00314 {
00315 ibrcommon::MutexLock l(bundleslock);
00316
00317 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00318 {
00319 if ( id == (*iter) )
00320 {
00321
00322 BundleContainer container = (*iter);
00323
00324
00325 dtn::data::BundleList::remove(container);
00326
00327
00328 _currentsize -= container.size();
00329
00330
00331 container.remove();
00332
00333
00334 bundles.erase(iter);
00335
00336 return;
00337 }
00338 }
00339
00340 throw BundleStorage::NoBundleFoundException();
00341 }
00342
00343 size_t SimpleBundleStorage::BundleStore::size() const
00344 {
00345 return _currentsize;
00346 }
00347
00348 void SimpleBundleStorage::BundleStore::clear()
00349 {
00350 ibrcommon::MutexLock l(bundleslock);
00351
00352
00353 for (std::set<BundleContainer>::iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00354 {
00355 BundleContainer container = (*iter);
00356
00357
00358 container.remove();
00359 }
00360
00361 bundles.clear();
00362 dtn::data::BundleList::clear();
00363
00364
00365 _currentsize = 0;
00366 }
00367
00368 void SimpleBundleStorage::BundleStore::eventBundleExpired(const ExpiringBundle &b)
00369 {
00370 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00371 {
00372 if ( b.bundle == (*iter) )
00373 {
00374 BundleContainer container = (*iter);
00375
00376
00377 _currentsize -= container.size();
00378
00379 container.remove();
00380 bundles.erase(iter);
00381 break;
00382 }
00383 }
00384
00385
00386 dtn::core::BundleEvent::raise( b.bundle, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED);
00387
00388
00389 dtn::core::BundleExpiredEvent::raise( b.bundle );
00390 }
00391
00392 const std::list<dtn::data::BundleID> SimpleBundleStorage::BundleStore::getList()
00393 {
00394 ibrcommon::MutexLock l(bundleslock);
00395 std::list<dtn::data::BundleID> ret;
00396
00397 for (std::set<BundleContainer>::const_iterator iter = bundles.begin(); iter != bundles.end(); iter++)
00398 {
00399 const BundleContainer &container = (*iter);
00400 ret.push_back( container );
00401 }
00402
00403 return ret;
00404 }
00405
00406 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b)
00407 : dtn::data::MetaBundle(b), _holder( new SimpleBundleStorage::BundleContainer::Holder(b) )
00408 {
00409 }
00410
00411 SimpleBundleStorage::BundleContainer::BundleContainer(const ibrcommon::File &file)
00412 : dtn::data::MetaBundle(), _holder( new SimpleBundleStorage::BundleContainer::Holder(file) )
00413 {
00414 dtn::data::Bundle b = _holder->getBundle();
00415 (dtn::data::MetaBundle&)(*this) = dtn::data::MetaBundle(b);
00416 }
00417
00418 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b, const ibrcommon::File &workdir, const size_t size)
00419 : dtn::data::MetaBundle(b), _holder( new SimpleBundleStorage::BundleContainer::Holder(b, workdir, size) )
00420 {
00421 }
00422
00423 SimpleBundleStorage::BundleContainer::~BundleContainer()
00424 {
00425 down();
00426 }
00427
00428 void SimpleBundleStorage::BundleContainer::down()
00429 {
00430 try {
00431 ibrcommon::MutexLock l(_holder->lock);
00432 if (--_holder->_count == 0) throw true;
00433 } catch (bool) {
00434 delete _holder;
00435 }
00436 }
00437
00438 size_t SimpleBundleStorage::BundleContainer::size() const
00439 {
00440 return _holder->size();
00441 }
00442
00443 bool SimpleBundleStorage::BundleContainer::operator<(const SimpleBundleStorage::BundleContainer& other) const
00444 {
00445 MetaBundle &left = (MetaBundle&)*this;
00446 MetaBundle &right = (MetaBundle&)other;
00447
00448 if (left.getPriority() > right.getPriority()) return true;
00449 if (left.getPriority() != right.getPriority()) return false;
00450 return (left < right);
00451 }
00452
00453 bool SimpleBundleStorage::BundleContainer::operator>(const SimpleBundleStorage::BundleContainer& other) const
00454 {
00455 MetaBundle &left = (MetaBundle&)*this;
00456 MetaBundle &right = (MetaBundle&)other;
00457
00458 if (left.getPriority() < right.getPriority()) return true;
00459 if (left.getPriority() != right.getPriority()) return false;
00460 return (left > right);
00461 }
00462
00463 bool SimpleBundleStorage::BundleContainer::operator!=(const SimpleBundleStorage::BundleContainer& other) const
00464 {
00465 return !((MetaBundle&)(*this) == (MetaBundle&)other);
00466 }
00467
00468 bool SimpleBundleStorage::BundleContainer::operator==(const SimpleBundleStorage::BundleContainer& other) const
00469 {
00470 return ((MetaBundle&)(*this) == (MetaBundle&)other);
00471 }
00472
00473 SimpleBundleStorage::BundleContainer& SimpleBundleStorage::BundleContainer::operator= (const SimpleBundleStorage::BundleContainer &right)
00474 {
00475 ibrcommon::MutexLock l(right._holder->lock);
00476 ++right._holder->_count;
00477
00478 down();
00479
00480 _holder = right._holder;
00481 return *this;
00482 }
00483
00484 SimpleBundleStorage::BundleContainer& SimpleBundleStorage::BundleContainer::operator= (SimpleBundleStorage::BundleContainer &right)
00485 {
00486 ibrcommon::MutexLock l(right._holder->lock);
00487 ++right._holder->_count;
00488
00489 down();
00490
00491 _holder = right._holder;
00492 return *this;
00493 }
00494
00495 SimpleBundleStorage::BundleContainer::BundleContainer(const SimpleBundleStorage::BundleContainer& right)
00496 : dtn::data::MetaBundle(right), _holder(right._holder)
00497 {
00498 ibrcommon::MutexLock l(_holder->lock);
00499 ++_holder->_count;
00500 }
00501
00502 dtn::data::Bundle SimpleBundleStorage::BundleContainer::get() const
00503 {
00504 return _holder->getBundle();
00505 }
00506
00507 void SimpleBundleStorage::BundleContainer::remove()
00508 {
00509 _holder->remove();
00510 }
00511
00512 void SimpleBundleStorage::BundleContainer::invokeStore()
00513 {
00514 _holder->invokeStore();
00515 }
00516
00517 SimpleBundleStorage::BundleContainer::Holder::Holder( const dtn::data::Bundle &b )
00518 : _count(1), _bundle(b), _state(HOLDER_MEMORY)
00519 {
00520 dtn::data::DefaultSerializer s(std::cout);
00521 _size = s.getLength(_bundle);
00522 }
00523
00524 SimpleBundleStorage::BundleContainer::Holder::Holder( const ibrcommon::File &file )
00525 : _count(1), _bundle(), _container(file), _state(HOLDER_STORED)
00526 {
00527 _size = file.size();
00528 }
00529
00530 SimpleBundleStorage::BundleContainer::Holder::Holder( const dtn::data::Bundle &b, const ibrcommon::File &workdir, const size_t size )
00531 : _count(1), _bundle(b), _state(HOLDER_PENDING), _size(size)
00532 {
00533 std::string namestr = workdir.getPath() + "/bundle-XXXXXXXX";
00534 char name[namestr.length()];
00535 ::strcpy(name, namestr.c_str());
00536
00537 int fd = mkstemp(name);
00538 if (fd == -1) throw ibrcommon::IOException("Could not create a temporary name.");
00539 ::close(fd);
00540
00541 _container = ibrcommon::File(name);
00542 }
00543
00544 SimpleBundleStorage::BundleContainer::Holder::~Holder()
00545 {
00546 ibrcommon::MutexLock l(_state_lock);
00547 if (_state == HOLDER_DELETED)
00548 {
00549 _container.remove();
00550 }
00551 }
00552
00553 size_t SimpleBundleStorage::BundleContainer::Holder::size() const
00554 {
00555 return _size;
00556 }
00557
00558 dtn::data::Bundle SimpleBundleStorage::BundleContainer::Holder::getBundle()
00559 {
00560 ibrcommon::MutexLock l(_state_lock);
00561 if (_state == HOLDER_DELETED)
00562 {
00563 throw dtn::SerializationFailedException("bundle deleted");
00564 }
00565 else if (_state == HOLDER_STORED)
00566 {
00567 dtn::data::Bundle bundle;
00568
00569
00570 std::fstream fs(_container.getPath().c_str(), ios::in|ios::binary);
00571 try {
00572 fs.exceptions(std::ios::badbit | std::ios::failbit | std::ios::eofbit);
00573 dtn::data::DefaultDeserializer(fs, dtn::core::BundleCore::getInstance()) >> bundle;
00574 } catch (ios_base::failure ex) {
00575 throw dtn::SerializationFailedException("can not load bundle data" + std::string(ex.what()));
00576 }
00577
00578 fs.close();
00579
00580 return bundle;
00581 }
00582 else
00583 {
00584 return _bundle;
00585 }
00586 }
00587
00588 void SimpleBundleStorage::BundleContainer::Holder::remove()
00589 {
00590 ibrcommon::MutexLock l(_state_lock);
00591 _state = HOLDER_DELETED;
00592 }
00593
00594 void SimpleBundleStorage::BundleContainer::Holder::invokeStore()
00595 {
00596 ibrcommon::MutexLock l(_state_lock);
00597 try {
00598 if (_state == HOLDER_PENDING)
00599 {
00600 std::fstream out(_container.getPath().c_str(), ios::in|ios::out|ios::binary|ios::trunc);
00601 out.exceptions(std::ios::badbit | std::ios::failbit | std::ios::eofbit);
00602 dtn::data::DefaultSerializer(out) << _bundle; out << std::flush;
00603 out.close();
00604 _size = _container.size();
00605 _bundle = dtn::data::Bundle();
00606 _state = HOLDER_STORED;
00607 }
00608 } catch (ios_base::failure ex) {
00609 throw dtn::SerializationFailedException("can not write data to the storage; " + std::string(ex.what()));
00610 }
00611 }
00612
00613 SimpleBundleStorage::TaskStoreBundle::TaskStoreBundle(const SimpleBundleStorage::BundleContainer &container)
00614 : _container(container)
00615 {
00616 }
00617
00618 SimpleBundleStorage::TaskStoreBundle::~TaskStoreBundle()
00619 {
00620
00621 }
00622
00623 void SimpleBundleStorage::TaskStoreBundle::run()
00624 {
00625 _container.invokeStore();
00626 }
00627 }
00628 }