|
IBR-DTNSuite 0.6
|
00001 #include "core/SimpleBundleStorage.h" 00002 #include "core/TimeEvent.h" 00003 #include "core/GlobalEvent.h" 00004 #include "core/BundleExpiredEvent.h" 00005 #include "core/BundleEvent.h" 00006 00007 #include <ibrdtn/data/AgeBlock.h> 00008 #include <ibrdtn/utils/Utils.h> 00009 #include <ibrcommon/thread/MutexLock.h> 00010 #include <ibrcommon/Logger.h> 00011 #include <ibrcommon/AutoDelete.h> 00012 00013 #include <string.h> 00014 #include <stdlib.h> 00015 #include <iostream> 00016 #include <fstream> 00017 #include <cstring> 00018 #include <cerrno> 00019 00020 namespace dtn 00021 { 00022 namespace core 00023 { 00024 SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, size_t maxsize, size_t buffer_limit) 00025 : _datastore(*this, workdir, buffer_limit), _maxsize(maxsize), _currentsize(0) 00026 { 00027 // load persistent bundles 00028 _datastore.iterateAll(); 00029 00030 // some output 00031 IBRCOMMON_LOGGER(info) << _bundles.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL; 00032 } 00033 00034 SimpleBundleStorage::~SimpleBundleStorage() 00035 { 00036 } 00037 00038 void SimpleBundleStorage::eventDataStorageStored(const dtn::core::DataStorage::Hash &hash) 00039 { 00040 ibrcommon::MutexLock l(_bundleslock); 00041 dtn::data::MetaBundle meta(_pending_bundles[hash]); 00042 _pending_bundles.erase(hash); 00043 _stored_bundles[meta] = hash; 00044 } 00045 00046 void SimpleBundleStorage::eventDataStorageStoreFailed(const dtn::core::DataStorage::Hash &hash, const ibrcommon::Exception &ex) 00047 { 00048 IBRCOMMON_LOGGER(error) << "store failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00049 00050 ibrcommon::MutexLock l(_bundleslock); 00051 00052 // get the reference to the bundle 00053 const dtn::data::Bundle &b = _pending_bundles[hash]; 00054 00055 // decrement the storage size 00056 _currentsize -= _bundle_size[b]; 00057 00058 // cleanup bundle sizes 00059 _bundle_size.erase(b); 00060 00061 // delete the pending bundle 00062 _pending_bundles.erase(hash); 00063 } 00064 00065 void SimpleBundleStorage::eventDataStorageRemoved(const dtn::core::DataStorage::Hash &hash) 00066 { 00067 ibrcommon::MutexLock l(_bundleslock); 00068 00069 for (std::map<dtn::data::MetaBundle, DataStorage::Hash>::iterator iter = _stored_bundles.begin(); 00070 iter != _stored_bundles.end(); iter++) 00071 { 00072 if (iter->second == hash) 00073 { 00074 // decrement the storage size 00075 _currentsize -= _bundle_size[iter->first]; 00076 00077 _bundle_size.erase(iter->first); 00078 00079 _stored_bundles.erase(iter); 00080 return; 00081 } 00082 } 00083 } 00084 00085 void SimpleBundleStorage::eventDataStorageRemoveFailed(const dtn::core::DataStorage::Hash&, const ibrcommon::Exception &ex) 00086 { 00087 IBRCOMMON_LOGGER(error) << "remove failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00088 } 00089 00090 void SimpleBundleStorage::iterateDataStorage(const dtn::core::DataStorage::Hash &hash, dtn::core::DataStorage::istream &stream) 00091 { 00092 try { 00093 dtn::data::Bundle bundle; 00094 dtn::data::DefaultDeserializer ds(*stream); 00095 00096 // load a bundle into the storage 00097 ds >> bundle; 00098 00099 // extract meta data 00100 dtn::data::MetaBundle meta(bundle); 00101 00102 // lock the bundle lists 00103 ibrcommon::MutexLock l(_bundleslock); 00104 00105 // add the bundle to the stored bundles 00106 _stored_bundles[meta] = hash; 00107 00108 // increment the storage size 00109 _bundle_size[meta] = (*stream).tellg(); 00110 _currentsize += (*stream).tellg(); 00111 00112 // add it to the bundle list 00113 dtn::data::BundleList::add(meta); 00114 _priority_index.insert(meta); 00115 00116 } catch (const std::exception&) { 00117 // report this error to the console 00118 IBRCOMMON_LOGGER(error) << "Error: Unable to restore bundle in file " << hash.value << IBRCOMMON_LOGGER_ENDL; 00119 00120 // error while reading file 00121 _datastore.remove(hash); 00122 } 00123 } 00124 00125 dtn::data::Bundle SimpleBundleStorage::__get(const dtn::data::MetaBundle &meta) 00126 { 00127 DataStorage::Hash hash(meta.toString()); 00128 std::map<DataStorage::Hash, dtn::data::Bundle>::iterator it = _pending_bundles.find(hash); 00129 00130 if (_pending_bundles.end() != it) 00131 { 00132 return it->second; 00133 } 00134 00135 try { 00136 DataStorage::istream stream = _datastore.retrieve(hash); 00137 00138 // load the bundle from the storage 00139 dtn::data::Bundle bundle; 00140 00141 // load the bundle from file 00142 try { 00143 dtn::data::DefaultDeserializer(*stream) >> bundle; 00144 } catch (const std::exception &ex) { 00145 throw dtn::SerializationFailedException("bundle get failed: " + std::string(ex.what())); 00146 } 00147 00148 try { 00149 dtn::data::AgeBlock &agebl = bundle.getBlock<dtn::data::AgeBlock>(); 00150 00151 // modify the AgeBlock with the age of the file 00152 time_t age = stream.lastaccess() - stream.lastmodify(); 00153 00154 agebl.addSeconds(age); 00155 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }; 00156 00157 return bundle; 00158 } catch (const DataStorage::DataNotAvailableException&) { 00159 throw BundleStorage::NoBundleFoundException(); 00160 } 00161 } 00162 00163 void SimpleBundleStorage::componentUp() 00164 { 00165 bindEvent(TimeEvent::className); 00166 _datastore.start(); 00167 } 00168 00169 void SimpleBundleStorage::componentDown() 00170 { 00171 unbindEvent(TimeEvent::className); 00172 _datastore.stop(); 00173 _datastore.join(); 00174 } 00175 00176 void SimpleBundleStorage::raiseEvent(const Event *evt) 00177 { 00178 try { 00179 const TimeEvent &time = dynamic_cast<const TimeEvent&>(*evt); 00180 if (time.getAction() == dtn::core::TIME_SECOND_TICK) 00181 { 00182 ibrcommon::MutexLock l(_bundleslock); 00183 dtn::data::BundleList::expire(time.getTimestamp()); 00184 } 00185 } catch (const std::bad_cast&) { } 00186 } 00187 00188 const std::string SimpleBundleStorage::getName() const 00189 { 00190 return "SimpleBundleStorage"; 00191 } 00192 00193 bool SimpleBundleStorage::empty() 00194 { 00195 ibrcommon::MutexLock l(_bundleslock); 00196 return dtn::data::BundleList::empty(); 00197 } 00198 00199 void SimpleBundleStorage::releaseCustody(const dtn::data::EID&, const dtn::data::BundleID&) 00200 { 00201 // custody is successful transferred to another node. 00202 // it is safe to delete this bundle now. (depending on the routing algorithm.) 00203 } 00204 00205 unsigned int SimpleBundleStorage::count() 00206 { 00207 ibrcommon::MutexLock l(_bundleslock); 00208 return dtn::data::BundleList::size(); 00209 } 00210 00211 const std::list<dtn::data::MetaBundle> SimpleBundleStorage::get(BundleFilterCallback &cb) 00212 { 00213 // result list 00214 std::list<dtn::data::MetaBundle> result; 00215 00216 // we have to iterate through all bundles 00217 ibrcommon::MutexLock l(_bundleslock); 00218 00219 for (std::set<dtn::data::MetaBundle>::const_iterator iter = _priority_index.begin(); (iter != _priority_index.end()) && (result.size() < cb.limit()); iter++) 00220 { 00221 const dtn::data::MetaBundle &meta = (*iter); 00222 00223 if ( cb.shouldAdd(meta) ) 00224 { 00225 result.push_back(meta); 00226 } 00227 } 00228 00229 return result; 00230 } 00231 00232 dtn::data::Bundle SimpleBundleStorage::get(const dtn::data::BundleID &id) 00233 { 00234 try { 00235 ibrcommon::MutexLock l(_bundleslock); 00236 00237 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++) 00238 { 00239 const dtn::data::MetaBundle &meta = (*iter); 00240 if (id == meta) 00241 { 00242 return __get(meta); 00243 } 00244 } 00245 } catch (const dtn::SerializationFailedException &ex) { 00246 // bundle loading failed 00247 IBRCOMMON_LOGGER(error) << "Error while loading bundle data: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00248 00249 // the bundle is broken, delete it 00250 remove(id); 00251 00252 throw BundleStorage::BundleLoadException(); 00253 } 00254 00255 throw BundleStorage::NoBundleFoundException(); 00256 } 00257 00258 void SimpleBundleStorage::store(const dtn::data::Bundle &bundle) 00259 { 00260 // get the bundle size 00261 dtn::data::DefaultSerializer s(std::cout); 00262 size_t bundle_size = s.getLength(bundle); 00263 00264 // store the bundle 00265 BundleContainer *bc = new BundleContainer(bundle); 00266 DataStorage::Hash hash(*bc); 00267 00268 // check if this container is too big for us. 00269 { 00270 ibrcommon::MutexLock l(_bundleslock); 00271 if ((_maxsize > 0) && (_currentsize + bundle_size > _maxsize)) 00272 { 00273 throw StorageSizeExeededException(); 00274 } 00275 00276 // create meta data object 00277 dtn::data::MetaBundle meta(bundle); 00278 00279 // add the bundle to the stored bundles 00280 _pending_bundles[hash] = bundle; 00281 00282 // increment the storage size 00283 _bundle_size[meta] = bundle_size; 00284 _currentsize += bundle_size; 00285 00286 // add it to the bundle list 00287 dtn::data::BundleList::add(meta); 00288 _priority_index.insert(meta); 00289 } 00290 00291 // put the bundle into the data store 00292 _datastore.store(hash, bc); 00293 } 00294 00295 void SimpleBundleStorage::remove(const dtn::data::BundleID &id) 00296 { 00297 ibrcommon::MutexLock l(_bundleslock); 00298 00299 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++) 00300 { 00301 if ((*iter) == id) 00302 { 00303 // remove item in the bundlelist 00304 dtn::data::MetaBundle meta = (*iter); 00305 00306 // remove it from the bundle list 00307 dtn::data::BundleList::remove(meta); 00308 _priority_index.erase(meta); 00309 00310 DataStorage::Hash hash(meta.toString()); 00311 00312 // create a background task for removing the bundle 00313 _datastore.remove(hash); 00314 00315 return; 00316 } 00317 } 00318 00319 throw BundleStorage::NoBundleFoundException(); 00320 } 00321 00322 dtn::data::MetaBundle SimpleBundleStorage::remove(const ibrcommon::BloomFilter &filter) 00323 { 00324 ibrcommon::MutexLock l(_bundleslock); 00325 00326 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++) 00327 { 00328 // remove item in the bundlelist 00329 const dtn::data::MetaBundle meta = (*iter); 00330 00331 if ( filter.contains(meta.toString()) ) 00332 { 00333 // remove it from the bundle list 00334 dtn::data::BundleList::remove(meta); 00335 _priority_index.erase(meta); 00336 00337 DataStorage::Hash hash(meta.toString()); 00338 00339 // create a background task for removing the bundle 00340 _datastore.remove(hash); 00341 00342 return meta; 00343 } 00344 } 00345 00346 throw BundleStorage::NoBundleFoundException(); 00347 } 00348 00349 size_t SimpleBundleStorage::size() const 00350 { 00351 return _currentsize; 00352 } 00353 00354 void SimpleBundleStorage::clear() 00355 { 00356 ibrcommon::MutexLock l(_bundleslock); 00357 00358 // mark all bundles for deletion 00359 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++) 00360 { 00361 // remove item in the bundlelist 00362 const dtn::data::MetaBundle meta = (*iter); 00363 00364 DataStorage::Hash hash(meta.toString()); 00365 00366 // create a background task for removing the bundle 00367 _datastore.remove(hash); 00368 } 00369 00370 _bundles.clear(); 00371 _priority_index.clear(); 00372 dtn::data::BundleList::clear(); 00373 00374 // set the storage size to zero 00375 _currentsize = 0; 00376 } 00377 00378 void SimpleBundleStorage::eventBundleExpired(const ExpiringBundle &b) 00379 { 00380 for (std::set<dtn::data::MetaBundle>::const_iterator iter = begin(); iter != end(); iter++) 00381 { 00382 if ((*iter) == b.bundle) 00383 { 00384 // remove the bundle 00385 const dtn::data::MetaBundle &meta = (*iter); 00386 00387 DataStorage::Hash hash(meta.toString()); 00388 00389 // create a background task for removing the bundle 00390 _datastore.remove(hash); 00391 00392 // remove the bundle off the index 00393 _priority_index.erase(meta); 00394 00395 break; 00396 } 00397 } 00398 00399 // raise bundle event 00400 dtn::core::BundleEvent::raise( b.bundle, dtn::core::BUNDLE_DELETED, dtn::data::StatusReportBlock::LIFETIME_EXPIRED); 00401 00402 // raise an event 00403 dtn::core::BundleExpiredEvent::raise( b.bundle ); 00404 } 00405 00406 SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b) 00407 : _bundle(b) 00408 { } 00409 00410 SimpleBundleStorage::BundleContainer::~BundleContainer() 00411 { } 00412 00413 std::string SimpleBundleStorage::BundleContainer::getKey() const 00414 { 00415 return dtn::data::BundleID(_bundle).toString(); 00416 } 00417 00418 std::ostream& SimpleBundleStorage::BundleContainer::serialize(std::ostream &stream) 00419 { 00420 // get an serializer for bundles 00421 dtn::data::DefaultSerializer s(stream); 00422 00423 // length of the bundle 00424 unsigned int size = s.getLength(_bundle); 00425 00426 // serialize the bundle 00427 s << _bundle; stream.flush(); 00428 00429 // check the streams health 00430 if (!stream.good()) 00431 { 00432 std::stringstream ss; ss << "Output stream went bad [" << std::strerror(errno) << "]"; 00433 throw dtn::SerializationFailedException(ss.str()); 00434 } 00435 00436 // get the write position 00437 if (size > stream.tellp()) 00438 { 00439 std::stringstream ss; ss << "Not all data were written [" << stream.tellp() << " of " << size << " bytes]"; 00440 throw dtn::SerializationFailedException(ss.str()); 00441 } 00442 00443 // return the stream, this allows stacking 00444 return stream; 00445 } 00446 } 00447 }