IBR-DTN  1.0.0
SimpleBundleStorage.cpp
Go to the documentation of this file.
1 /*
2  * SimpleBundleStorage.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
23 #include "core/EventDispatcher.h"
25 #include "core/BundleEvent.h"
26 
27 #include <ibrdtn/data/AgeBlock.h>
28 #include <ibrdtn/utils/Utils.h>
29 #include <ibrdtn/utils/Clock.h>
30 #include <ibrcommon/thread/RWLock.h>
31 #include <ibrcommon/Logger.h>
32 
33 #include <memory>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <iostream>
37 #include <iomanip>
38 #include <fstream>
39 #include <cstring>
40 #include <cerrno>
41 
42 namespace dtn
43 {
44  namespace storage
45  {
46  const std::string SimpleBundleStorage::TAG = "SimpleBundleStorage";
47 
48  SimpleBundleStorage::SimpleBundleStorage(const ibrcommon::File &workdir, const dtn::data::Length maxsize, const unsigned int buffer_limit)
49  : BundleStorage(maxsize), _datastore(*this, workdir, buffer_limit), _metastore(this)
50  {
51  }
52 
54  {
55  }
56 
58  {
59  IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 30) << "element successfully stored: " << hash.value << IBRCOMMON_LOGGER_ENDL;
60 
61  ibrcommon::RWLock l(_pending_lock);
62  _pending_bundles.erase(hash);
63  }
64 
66  {
67  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "store of element " << hash.value << " failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
68 
70 
71  {
72  ibrcommon::MutexLock l(_pending_lock);
73 
74  // get the reference to the bundle
75  const dtn::data::Bundle &b = _pending_bundles[hash];
77  }
78 
79  {
80  ibrcommon::RWLock l(_pending_lock);
81 
82  // delete the pending bundle
83  _pending_bundles.erase(hash);
84  }
85 
86  ibrcommon::RWLock l(_meta_lock);
87 
88  // remove bundle and decrement the storage size
89  freeSpace( _metastore.remove(meta) );
90  }
91 
93  {
94  IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 30) << "element successfully removed: " << hash.value << IBRCOMMON_LOGGER_ENDL;
95 
96  ibrcommon::RWLock l(_meta_lock);
97 
98  for (MetaStorage::const_iterator it = _metastore.begin(); it != _metastore.end(); ++it)
99  {
100  const dtn::data::MetaBundle &meta = (*it);
101  DataStorage::Hash it_hash(BundleContainer::createId(meta));
102 
103  if (it_hash == hash)
104  {
105  // remove bundle and decrement the storage size
106  freeSpace( _metastore.remove(meta) );
107 
108  return;
109  }
110  }
111  }
112 
114  {
115  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "remove of element " << hash.value << " failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
116 
117  // forward this to eventDataStorageRemoved
119  }
120 
122  {
123  try {
124  dtn::data::Bundle bundle;
125  dtn::data::DefaultDeserializer ds(*stream);
126 
127  // load a bundle into the storage
128  ds >> bundle;
129 
130  // extract meta data
132 
133  // check if the hash is different
134  DataStorage::Hash hash2(BundleContainer::createId(meta));
135  if (hash != hash2)
136  {
137  // if hash does not match, remove old file
138  _datastore.remove(hash);
139 
140  // and store bundle again
141  store(bundle);
142 
143  return;
144  }
145 
146  // allocate space for the bundle
147  const dtn::data::Length bundle_size = static_cast<dtn::data::Length>( (*stream).tellg() );
148  allocSpace(bundle_size);
149 
150  // lock the bundle lists
151  ibrcommon::RWLock l(_meta_lock);
152 
153  // add the bundle to the stored bundles
154  _metastore.store(meta, bundle_size);
155 
156  // raise bundle added event
157  eventBundleAdded(meta);
158 
159  IBRCOMMON_LOGGER_DEBUG_TAG(SimpleBundleStorage::TAG, 10) << "bundle restored " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
160  } catch (const std::exception&) {
161  // report this error to the console
162  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "Unable to restore bundle from file " << hash.value << IBRCOMMON_LOGGER_ENDL;
163 
164  // error while reading file
165  _datastore.remove(hash);
166  }
167  }
168 
170  {
171  // routine checked for throw() on 15.02.2013
172 
173  // load persistent bundles
174  _datastore.iterateAll();
175 
176  // some output
177  {
178  ibrcommon::MutexLock l(_meta_lock);
179  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, info) << _metastore.size() << " Bundles restored." << IBRCOMMON_LOGGER_ENDL;
180  }
181 
183 
184  try {
185  _datastore.start();
186  } catch (const ibrcommon::ThreadException &ex) {
187  IBRCOMMON_LOGGER_TAG("SimpleBundleStorage", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
188  }
189  }
190 
192  {
193  // routine checked for throw() on 15.02.2013
194 
196  try {
197  _datastore.wait();
198  _datastore.stop();
199  _datastore.join();
200 
201  // reset datastore
202  _datastore.reset();
203 
204  // clear all data structures
205  ibrcommon::RWLock l(_meta_lock);
206  _metastore.clear();
207  clearSpace();
208  } catch (const ibrcommon::Exception &ex) {
209  IBRCOMMON_LOGGER_TAG("SimpleBundleStorage", error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
210  }
211  }
212 
214  {
215  if (time.getAction() == dtn::core::TIME_SECOND_TICK)
216  {
217  ibrcommon::RWLock l(_meta_lock);
218  _metastore.expire(time.getTimestamp());
219  }
220  }
221 
222  const std::string SimpleBundleStorage::getName() const
223  {
224  return "SimpleBundleStorage";
225  }
226 
228  {
229  ibrcommon::MutexLock l(_meta_lock);
230  return _metastore.empty();
231  }
232 
234  {
235  // custody is successful transferred to another node.
236  // it is safe to delete this bundle now. (depending on the routing algorithm.)
237  }
238 
240  {
241  ibrcommon::MutexLock l(_meta_lock);
242  return _metastore.size();
243  }
244 
246  {
247  _datastore.wait();
248  }
249 
251  {
252  _faulty = mode;
253  _datastore.setFaulty(mode);
254  }
255 
257  {
258  size_t items_added = 0;
259 
260  // we have to iterate through all bundles
261  ibrcommon::MutexLock l(_meta_lock);
262 
263  for (MetaStorage::const_iterator iter = _metastore.begin(); (iter != _metastore.end()) && ((cb.limit() == 0) || (items_added < cb.limit())); ++iter)
264  {
265  const dtn::data::MetaBundle &meta = (*iter);
266 
267  // skip expired bundles
268  if ( dtn::utils::Clock::isExpired( meta ) ) continue;
269 
270  if ( cb.addIfSelected(result, meta) ) items_added++;
271  }
272 
273  if (items_added == 0) throw NoBundleFoundException();
274  }
275 
277  {
278  try {
279  ibrcommon::MutexLock l(_meta_lock);
280 
281  // faulty mechanism for unit-testing
282  if (_faulty) {
283  throw dtn::SerializationFailedException("bundle get failed due to faulty setting");
284  }
285 
286  // search for the bundle in the meta storage
287  const dtn::data::MetaBundle &meta = _metastore.find(dtn::data::MetaBundle::create(id));
288 
289  // create a hash for the data storage
290  DataStorage::Hash hash(BundleContainer::createId(meta));
291 
292  // check pending bundles
293  {
294  ibrcommon::MutexLock l(_pending_lock);
295 
296  pending_map::iterator it = _pending_bundles.find(hash);
297 
298  if (_pending_bundles.end() != it)
299  {
300  return it->second;
301  }
302  }
303 
304  try {
305  DataStorage::istream stream = _datastore.retrieve(hash);
306 
307  // load the bundle from the storage
308  dtn::data::Bundle bundle;
309 
310  // load the bundle from file
311  try {
312  dtn::data::DefaultDeserializer(*stream) >> bundle;
313  } catch (const std::exception &ex) {
314  throw dtn::SerializationFailedException(ex.what());
315  }
316 
317  try {
318  dtn::data::AgeBlock &agebl = bundle.find<dtn::data::AgeBlock>();
319 
320  // modify the AgeBlock with the age of the file
321  time_t age = stream.lastaccess() - stream.lastmodify();
322 
323  agebl.addSeconds(age);
325 
326  return bundle;
327  } catch (const DataStorage::DataNotAvailableException &ex) {
328  throw dtn::SerializationFailedException(ex.what());
329  }
330  } catch (const dtn::SerializationFailedException &ex) {
331  // bundle loading failed
332  IBRCOMMON_LOGGER_TAG(SimpleBundleStorage::TAG, error) << "failed to load bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
333 
334  // the bundle is broken, delete it
335  remove(id);
336 
337  throw BundleStorage::BundleLoadException(ex.what());
338  }
339 
340  throw NoBundleFoundException();
341  }
342 
344  {
345  ibrcommon::MutexLock l(_meta_lock);
346  return _metastore.getDistinctDestinations();
347  }
348 
350  {
351  // get the bundle size
352  dtn::data::DefaultSerializer s(std::cout);
353  const dtn::data::Length bundle_size = s.getLength(bundle);
354 
355  // allocate space for the bundle
356  allocSpace(bundle_size);
357 
358  // accept custody if requested
359  try {
360  // create meta data object
362 
363  // accept custody
364  const dtn::data::EID custodian = BundleStorage::acceptCustody(meta);
365 
366  // container for the custody accepted bundle
367  dtn::data::Bundle ca_bundle = bundle;
368 
369  // set the new custodian
370  ca_bundle.custodian = custodian;
371 
372  // store the bundle with the custodian
373  __store(ca_bundle, bundle_size);
374  } catch (const ibrcommon::Exception&) {
375  // no custody has been requested - go on with standard store procedure
376  __store(bundle, bundle_size);
377  }
378  }
379 
381  {
382  ibrcommon::MutexLock l(_meta_lock);
383 
384  // search for the bundle in the meta storage
385  return _metastore.contains(id);
386  }
387 
389  {
390  ibrcommon::MutexLock l(_meta_lock);
391 
392  // search for the bundle in the meta storage
393  return _metastore.find(dtn::data::MetaBundle::create(id));
394  }
395 
397  {
398  ibrcommon::MutexLock l(_meta_lock);
399  const dtn::data::MetaBundle &meta = _metastore.find(dtn::data::MetaBundle::create(id));
400 
401  // first check if the bundles is already marked as removed
402  if (!_metastore.isRemoved(meta))
403  {
404  // remove if from the meta storage
405  _metastore.markRemoved(meta);
406 
407  // create the hash for data storage removal
408  DataStorage::Hash hash(BundleContainer::createId(meta));
409 
410  // create a background task for removing the bundle
411  _datastore.remove(hash);
412 
413  // raise bundle removed event
414  eventBundleRemoved(meta);
415  }
416  }
417 
418  void SimpleBundleStorage::__store(const dtn::data::Bundle &bundle, const dtn::data::Length &bundle_size)
419  {
420  // create meta bundle object
422 
423  // create a new container and hash
424  std::auto_ptr<BundleContainer> bc(new BundleContainer(bundle));
425  DataStorage::Hash hash(*bc);
426 
427  // enter critical section - lock pending bundles
428  {
429  ibrcommon::RWLock l(_pending_lock);
430 
431  // add bundle to the pending bundles
432  _pending_bundles[hash] = bundle;
433  }
434 
435  // enter critical section - lock all data structures
436  {
437  ibrcommon::RWLock l(_meta_lock);
438 
439  // add the new bundles to the meta storage
440  _metastore.store(meta, bundle_size);
441  }
442 
443  // put the bundle into the data store
444  _datastore.store(hash, bc.get());
445  bc.release();
446 
447  // raise bundle added event
448  eventBundleAdded(meta);
449  }
450 
452  {
453  ibrcommon::RWLock l(_meta_lock);
454 
455  // mark all bundles for deletion
456  for (MetaStorage::const_iterator iter = _metastore.begin(); iter != _metastore.end(); ++iter)
457  {
458  // remove item in the bundlelist
459  const dtn::data::MetaBundle &meta = (*iter);
460 
461  DataStorage::Hash hash(BundleContainer::createId(meta));
462 
463  // create a background task for removing the bundle
464  _datastore.remove(hash);
465  }
466  }
467 
469  {
470  DataStorage::Hash hash(BundleContainer::createId(b));
471 
472  // create a background task for removing the bundle
473  _datastore.remove(hash);
474 
475  // raise bundle event
477 
478  // raise an event
480 
481  // raise bundle removed event
482  eventBundleRemoved(b);
483  }
484 
485  SimpleBundleStorage::BundleContainer::BundleContainer(const dtn::data::Bundle &b)
486  : _bundle(b)
487  { }
488 
489  SimpleBundleStorage::BundleContainer::~BundleContainer()
490  { }
491 
492  std::string SimpleBundleStorage::BundleContainer::getId() const
493  {
494  return createId(_bundle);
495  }
496 
497  std::string SimpleBundleStorage::BundleContainer::createId(const dtn::data::BundleID &id)
498  {
499  std::stringstream ss_hash, ss_raw;
500  ss_raw << id;
501 
502  int c = 0xff & ss_raw.get();
503  while (ss_raw.good())
504  {
505  ss_hash << std::hex << std::setw( 2 ) << std::setfill( '0' ) << c;
506  c = 0xff & ss_raw.get();
507  }
508 
509  return ss_hash.str();
510  }
511 
512  std::ostream& SimpleBundleStorage::BundleContainer::serialize(std::ostream &stream)
513  {
514  // get an serializer for bundles
516 
517  // length of the bundle
518  dtn::data::Length size = s.getLength(_bundle);
519 
520  // serialize the bundle
521  s << _bundle; stream.flush();
522 
523  // check the streams health
524  if (!stream.good())
525  {
526  std::stringstream ss; ss << "Output stream went bad [" << std::strerror(errno) << "]";
527  throw dtn::SerializationFailedException(ss.str());
528  }
529 
530  // get the write position
531  if (static_cast<std::streamoff>(size) > stream.tellp())
532  {
533  std::stringstream ss; ss << "Not all data were written [" << stream.tellp() << " of " << size << " bytes]";
534  throw dtn::SerializationFailedException(ss.str());
535  }
536 
537  // return the stream, this allows stacking
538  return stream;
539  }
540  }
541 }
priority_set::const_iterator const_iterator
Definition: MetaStorage.h:71
void markRemoved(const dtn::data::MetaBundle &meta)
virtual void eventDataStorageStoreFailed(const dtn::storage::DataStorage::Hash &hash, const ibrcommon::Exception &)
void remove(const dtn::data::BundleID &id)
virtual dtn::data::MetaBundle info(const dtn::data::BundleID &id)
virtual void eventDataStorageStored(const dtn::storage::DataStorage::Hash &hash)
static void add(EventReceiver< E > *receiver)
std::set< dtn::data::EID > eid_set
Definition: BundleSeeker.h:39
const dtn::data::MetaBundle & find(const T &id) const
Definition: MetaStorage.h:86
virtual Length getLength(const dtn::data::Bundle &obj)
Definition: Serializer.cpp:382
void addSeconds(const dtn::data::Number &value)
Definition: AgeBlock.cpp:71
void setFaulty(bool mode)
size_t Length
Definition: Number.h:33
dtn::data::Length remove(const dtn::data::MetaBundle &meta)
Definition: MetaStorage.cpp:94
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)
const dtn::data::EID acceptCustody(const dtn::data::MetaBundle &meta)
static void remove(const EventReceiver< E > *receiver)
void releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id)
virtual void eventDataStorageRemoved(const dtn::storage::DataStorage::Hash &hash)
void raiseEvent(const dtn::core::TimeEvent &evt)
std::set< dtn::data::EID > getDistinctDestinations() const
Definition: MetaStorage.cpp:65
void freeSpace(const dtn::data::Length &size)
virtual const eid_set getDistinctDestinations()
virtual bool contains(const dtn::data::BundleID &id)
static void raise(const dtn::data::Bundle &bundle)
void allocSpace(const dtn::data::Length &size)
virtual void eventBundleExpired(const dtn::data::MetaBundle &b)
void eventBundleAdded(const dtn::data::MetaBundle &b)
bool isRemoved(const dtn::data::MetaBundle &meta) const
void store(const dtn::data::MetaBundle &meta, const dtn::data::Length &space)
Definition: MetaStorage.cpp:82
SimpleBundleStorage(const ibrcommon::File &workdir, const dtn::data::Length maxsize=0, const unsigned int buffer_limit=0)
DataStorage::istream retrieve(const Hash &hash)
virtual const std::string getName() const
virtual void eventDataStorageRemoveFailed(const dtn::storage::DataStorage::Hash &hash, const ibrcommon::Exception &)
void eventBundleRemoved(const dtn::data::BundleID &id)
size_t Size
Definition: Number.h:34
const Hash store(Container *data)
iterator find(block_t blocktype)
Definition: Bundle.cpp:307
static void raise(const dtn::data::MetaBundle &bundle, EventBundleAction action, dtn::data::StatusReportBlock::REASON_CODE reason=dtn::data::StatusReportBlock::NO_ADDITIONAL_INFORMATION)
Definition: BundleEvent.cpp:78
static MetaBundle create(const dtn::data::BundleID &id)
Definition: MetaBundle.cpp:34
virtual void iterateDataStorage(const dtn::storage::DataStorage::Hash &hash, dtn::storage::DataStorage::istream &stream)
bool contains(const dtn::data::BundleID &id) const
Definition: MetaStorage.cpp:37
void remove(const Hash &hash)
static bool isExpired(const dtn::data::Timestamp &timestamp, const dtn::data::Number &lifetime)
Definition: Clock.cpp:155
virtual void store(const dtn::data::Bundle &bundle)