IBR-DTN  1.0.0
SQLiteBundleStorage.cpp
Go to the documentation of this file.
1 /*
2  * SQLiteBundleStorage.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  * Written-by: Matthias Myrtus
8  *
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  * http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  *
21  */
22 
23 
25 #include "core/EventDispatcher.h"
26 #include "core/BundleCore.h"
27 #include "core/BundleEvent.h"
28 
31 #include <ibrdtn/data/AgeBlock.h>
32 #include <ibrdtn/data/Serializer.h>
33 #include <ibrdtn/data/Bundle.h>
34 #include <ibrdtn/data/BundleID.h>
35 
36 #include <ibrcommon/thread/MutexLock.h>
37 #include <ibrcommon/thread/RWLock.h>
38 #include <ibrcommon/data/BLOB.h>
39 #include <ibrcommon/Logger.h>
40 #include <memory>
41 #include <unistd.h>
42 
43 namespace dtn
44 {
45  namespace storage
46  {
47  const std::string SQLiteBundleStorage::TAG = "SQLiteBundleStorage";
48 
49  ibrcommon::Mutex SQLiteBundleStorage::TaskIdle::_mutex;
50  bool SQLiteBundleStorage::TaskIdle::_idle = false;
51 
52  SQLiteBundleStorage::SQLiteBLOB::SQLiteBLOB(const ibrcommon::File &path)
53  : _file(path, "blob")
54  {
55  }
56 
57  SQLiteBundleStorage::SQLiteBLOB::~SQLiteBLOB()
58  {
59  // delete the file if the last reference is destroyed
60  _file.remove();
61  }
62 
63  void SQLiteBundleStorage::SQLiteBLOB::clear()
64  {
65  // close the file
66  _filestream.close();
67 
68  // open temporary file
69  _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::trunc | ios::binary );
70 
71  if (!_filestream.is_open())
72  {
73  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
74  throw ibrcommon::CanNotOpenFileException(_file);
75  }
76  }
77 
78  void SQLiteBundleStorage::SQLiteBLOB::open()
79  {
80  ibrcommon::BLOB::_filelimit.wait();
81 
82  // open temporary file
83  _filestream.open(_file.getPath().c_str(), ios::in | ios::out | ios::binary );
84 
85  if (!_filestream.is_open())
86  {
87  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) << "can not open temporary file " << _file.getPath() << IBRCOMMON_LOGGER_ENDL;
88  throw ibrcommon::CanNotOpenFileException(_file);
89  }
90  }
91 
92  void SQLiteBundleStorage::SQLiteBLOB::close()
93  {
94  // flush the filestream
95  _filestream.flush();
96 
97  // close the file
98  _filestream.close();
99 
100  ibrcommon::BLOB::_filelimit.post();
101  }
102 
103  std::streamsize SQLiteBundleStorage::SQLiteBLOB::__get_size()
104  {
105  return _file.size();
106  }
107 
108  ibrcommon::BLOB::Reference SQLiteBundleStorage::create()
109  {
110  return ibrcommon::BLOB::Reference(new SQLiteBLOB(_blobPath));
111  }
112 
113  SQLiteBundleStorage::SQLiteBundleStorage(const ibrcommon::File &path, const dtn::data::Length &maxsize, bool usePersistentBundleSets)
114  : BundleStorage(maxsize), _database(path.get("sqlite.db"), *this)
115  {
116  //let the factory create SQLiteBundleSets
117  if (usePersistentBundleSets)
119 
120  // use sqlite storage as BLOB provider, auto delete off
121  ibrcommon::BLOB::changeProvider(this, false);
122 
123  // set the block path
124  _blockPath = path.get("blocks");
125  _blobPath = path.get("blob");
126 
127  try {
128  ibrcommon::RWLock l(_global_lock);
129 
130  // delete all old BLOB container
131  _blobPath.remove(true);
132 
133  // create BLOB folder
134  ibrcommon::File::createDirectory( _blobPath );
135 
136  // create the bundle folder
137  ibrcommon::File::createDirectory( _blockPath );
138 
139  // open the database and create all folders and files if needed
140  _database.open();
141  } catch (const ibrcommon::Exception &ex) {
142  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
143  }
144  }
145 
147  {
148  // stop factory from creating SQLiteBundleSets
150 
151  try {
152  ibrcommon::RWLock l(_global_lock);
153 
154  // close the database
155  _database.close();
156  } catch (const ibrcommon::Exception &ex) {
157  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
158  }
159  }
160 
162  {
163  // loop until aborted
164  try {
165  while (true)
166  {
167  Task *t = _tasks.poll();
168 
169  try {
170  BlockingTask &btask = dynamic_cast<BlockingTask&>(*t);
171  try {
172  btask.run(*this);
173  } catch (const std::exception&) {
174  btask.abort();
175  continue;
176  };
177  btask.done();
178  continue;
179  } catch (const std::bad_cast&) { };
180 
181  try {
182  std::auto_ptr<Task> killer(t);
183  t->run(*this);
184  } catch (const std::exception&) { };
185  }
186  } catch (const ibrcommon::QueueUnblockedException &ex) {
187  // we are aborted, abort all blocking tasks
188  }
189  }
190 
192  {
193  // routine checked for throw() on 15.02.2013
194 
195  //register Events
198 
199  try {
200  // iterate through all bundles to generate indexes
201  _database.iterateAll();
202  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
203  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
204  }
205  }
206 
208  {
209  // routine checked for throw() on 15.02.2013
210 
211  //unregister Events
214 
215  stop();
216  join();
217  }
218 
220  {
221  _tasks.abort();
222  }
223 
225  {
226  try {
227  ibrcommon::MutexLock l(_global_lock);
228  return _database.getDistinctDestinations();
229  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
230  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
231  }
233  }
234 
236  {
237  ibrcommon::MutexLock l(_global_lock);
238  _database.get(cb, result);
239  }
240 
242  {
244  dtn::data::Bundle bundle;
245 
246  try {
247  ibrcommon::MutexLock l(_global_lock);
248 
249  // query the data base for the bundle
250  _database.get(id, bundle, blocks);
251 
252  for (SQLiteDatabase::blocklist::const_iterator iter = blocks.begin(); iter != blocks.end(); ++iter)
253  {
254  const SQLiteDatabase::blocklist_entry &entry = (*iter);
255  const int blocktyp = entry.first;
256  const ibrcommon::File &file = entry.second;
257 
258  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 50) << "add block: " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
259 
260  // load block from file
261  std::ifstream is(file.getPath().c_str(), std::ios::binary | std::ios::in);
262 
263  if (blocktyp == dtn::data::PayloadBlock::BLOCK_TYPE)
264  {
265  // create a new BLOB object
266  SQLiteBLOB *blob = new SQLiteBLOB(_blobPath);
267 
268  // create a reference of the BLOB
269  ibrcommon::BLOB::Reference ref(blob);
270 
271  try {
272  // remove the temporary file
273  blob->_file.remove();
274 
275  // generate a hard-link, pointing to the BLOB file
276  if ( ::link(file.getPath().c_str(), blob->_file.getPath().c_str()) != 0 )
277  {
278  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) << "hard-link failed (" << errno << ") " << blob->_file.getPath() << " -> " << file.getPath() << IBRCOMMON_LOGGER_ENDL;
279 
280  // copy the BLOB into a new file if hard-links are not supported
281  std::ofstream fout(blob->_file.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
282 
283  // open the filestream
284  ibrcommon::BLOB::iostream stream = ref.iostream();
285 
286  const std::streamsize length = stream.size();
287  ibrcommon::BLOB::copy(fout, (*stream), length);
288  }
289  else
290  {
291  // update BLOB size
292  blob->update();
293  }
294 
295  // add payload block to the bundle
296  bundle.push_back(ref);
297  } catch (const ibrcommon::Exception &ex) {
298  // remove the temporary file
299  blob->_file.remove();
300 
301  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, error) << "unable to load bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
302  throw dtn::SerializationFailedException(ex.what());
303  }
304  }
305  else
306  {
307  try {
308  // read the block
310 
311  // close the file
312  is.close();
313 
314  // modify the age block if present
315  try {
316  dtn::data::AgeBlock &agebl = dynamic_cast<dtn::data::AgeBlock&>(block);
317 
318  // modify the AgeBlock with the age of the file
319  time_t age = file.lastaccess() - file.lastmodify();
320 
321  agebl.addSeconds(age);
322  } catch (const std::bad_cast&) { };
324  // skip extensions block
325  }
326  }
327  }
328  } catch (const SQLiteDatabase::SQLiteQueryException &ex) {
329  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
330 
331  // try to purge all referenced data
332  remove(id);
333 
335  }
336 
337  return bundle;
338  }
339 
341  {
342  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) << "store bundle " << bundle.toString() << IBRCOMMON_LOGGER_ENDL;
343 
344  ibrcommon::RWLock l(_global_lock);
345 
346  // get size of the bundle
347  dtn::data::DefaultSerializer s(std::cout);
348  dtn::data::Length size = s.getLength(bundle);
349 
350  // increment the storage size
351  allocSpace(size);
352 
353  // start transaction to store the bundle
354  _database.transaction();
355 
356  try {
357  // store the bundle data in the database
358  _database.store(bundle, size);
359 
360  // create a bundle id
361  const dtn::data::BundleID &id = bundle;
363 
364  // index number for order of the blocks
365  int index = 1;
366 
367  // number of bytes stored
368  dtn::data::Length storedBytes = 0;
369 
370  for(dtn::data::Bundle::const_iterator it = bundle.begin() ;it != bundle.end(); ++it)
371  {
372  const dtn::data::Block &block = (**it);
373 
375  {
376  // create a temporary file
377  ibrcommon::TemporaryFile tmpfile(_blockPath, "payload");
378 
379  try {
380  const dtn::data::PayloadBlock &payload = dynamic_cast<const dtn::data::PayloadBlock&>(block);
381  ibrcommon::BLOB::Reference ref = payload.getBLOB();
382  ibrcommon::BLOB::iostream stream = ref.iostream();
383 
384  try {
385  const SQLiteBLOB &blob = dynamic_cast<const SQLiteBLOB&>(*ref);
386 
387  // first remove the tmp file
388  tmpfile.remove();
389 
390  // make a hard-link to the origin blob file
391  if ( ::link(blob._file.getPath().c_str(), tmpfile.getPath().c_str()) != 0 )
392  {
393  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 25) << "hard-link failed (" << errno << ") " << tmpfile.getPath() << " -> " << blob._file.getPath() << IBRCOMMON_LOGGER_ENDL;
394 
395  // copy the BLOB into a new file if hard-links are not supported
396  std::ofstream fout(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
397 
398  const std::streamsize length = stream.size();
399  ibrcommon::BLOB::copy(fout, (*stream), length);
400  }
401  } catch (const std::bad_cast&) {
402  // copy the BLOB into a new file this isn't a sqlite block object
403  std::ofstream fout(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
404 
405  const std::streamsize length = stream.size();
406  ibrcommon::BLOB::copy(fout, (*stream), length);
407  }
408  } catch (const std::bad_cast&) {
409  // remove the tmp file
410  tmpfile.remove();
411  throw ibrcommon::Exception("not a payload block");
412  }
413 
414  // add determine the amount of stored bytes
415  storedBytes += tmpfile.size();
416 
417  // store the block into the database
418  _database.store(id, index, block, tmpfile);
419  }
420  else
421  {
422  ibrcommon::TemporaryFile tmpfile(_blockPath, "block");
423 
424  std::ofstream filestream(tmpfile.getPath().c_str(), std::ofstream::out | std::ofstream::trunc | std::ofstream::binary);
425  dtn::data::SeparateSerializer serializer(filestream);
426  serializer << block;
427  filestream.close();
428 
429  // add determine the amount of stored bytes
430  storedBytes += tmpfile.size();
431 
432  // store the block into the database
433  _database.store(id, index, block, tmpfile);
434  }
435 
436  // increment index
437  index++;
438  }
439 
440  _database.commit();
441 
442  try {
443  // the bundle is stored sucessfully, we could accept custody if it is requested
444  const dtn::data::EID custodian = acceptCustody(meta);
445 
446  // update the custody address of this bundle
447  _database.update(SQLiteDatabase::UPDATE_CUSTODIAN, bundle, custodian);
448  } catch (const ibrcommon::Exception&) {
449  // this bundle has no request for custody transfers
450  }
451 
452  IBRCOMMON_LOGGER_DEBUG_TAG(SQLiteBundleStorage::TAG, 10) << "bundle " << bundle.toString() << " stored" << IBRCOMMON_LOGGER_ENDL;
453 
454  // raise bundle added event
455  eventBundleAdded(meta);
456  } catch (const ibrcommon::Exception &ex) {
457  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
458  _database.rollback();
459 
460  // free the previously allocated space
461  freeSpace(size);
462  }
463  }
464 
466  {
467  try {
468  ibrcommon::MutexLock l(_global_lock);
469  return _database.contains(id);
470  } catch (const SQLiteDatabase::SQLiteQueryException&) {
471  return false;
472  }
473  }
474 
476  {
477  try {
478  ibrcommon::MutexLock l(_global_lock);
480  _database.get(id, ret);
481  return ret;
482  } catch (const SQLiteDatabase::SQLiteQueryException&) {
484  }
485  }
486 
488  {
489  // remove the bundle in locked state
490  try {
491  ibrcommon::RWLock l(_global_lock);
492  freeSpace( _database.remove(id) );
493 
494  // raise bundle removed event
495  eventBundleRemoved(id);
496  } catch (const ibrcommon::Exception &ex) {
497  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
498  }
499  }
500 
502  {
503  ibrcommon::RWLock l(_global_lock);
504 
505  try {
506  _database.clear();
507  } catch (const ibrcommon::Exception &ex) {
508  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
509  }
510 
511  //Delete Folder SQL_TABLE_BLOCK containing Blocks
512  _blockPath.remove(true);
513  ibrcommon::File::createDirectory(_blockPath);
514 
515  // set the storage size to zero
516  clearSpace();
517  }
518 
520  {
521  try {
522  ibrcommon::MutexLock l(_global_lock);
523  return _database.empty();
524  } catch (const ibrcommon::Exception &ex) {
525  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
526  }
527  return true;
528  }
529 
531  {
532  try {
533  ibrcommon::MutexLock l(_global_lock);
534  return _database.count();
535  } catch (const ibrcommon::Exception &ex) {
536  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
537  }
538  return 0;
539  }
540 
542  {
543  if (time.getAction() == dtn::core::TIME_SECOND_TICK)
544  {
545  _tasks.push(new TaskExpire(time.getTimestamp()));
546  }
547  }
548 
550  {
551  if (global.getAction() == dtn::core::GlobalEvent::GLOBAL_IDLE)
552  {
553  // switch to idle mode
554  ibrcommon::MutexLock l(TaskIdle::_mutex);
555  TaskIdle::_idle = true;
556 
557  // generate an idle task
558  _tasks.push(new TaskIdle());
559  }
560  else if (global.getAction() == dtn::core::GlobalEvent::GLOBAL_BUSY)
561  {
562  // switch back to non-idle mode
563  ibrcommon::MutexLock l(TaskIdle::_mutex);
564  TaskIdle::_idle = false;
565  }
566  }
567 
568  void SQLiteBundleStorage::TaskExpire::run(SQLiteBundleStorage &storage)
569  {
570  try {
571  ibrcommon::RWLock l(storage._global_lock);
572  storage._database.expire(_timestamp);
573  } catch (const ibrcommon::Exception &ex) {
574  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
575  }
576  }
577 
578  void SQLiteBundleStorage::TaskIdle::run(SQLiteBundleStorage &storage)
579  {
580  // until IDLE is false
581  while (true)
582  {
583  /*
584  * When an object (table, index, trigger, or view) is dropped from the database, it leaves behind empty space.
585  * This empty space will be reused the next time new information is added to the database. But in the meantime,
586  * the database file might be larger than strictly necessary. Also, frequent inserts, updates, and deletes can
587  * cause the information in the database to become fragmented - scrattered out all across the database file rather
588  * than clustered together in one place.
589  * The VACUUM command cleans the main database. This eliminates free pages, aligns table data to be contiguous,
590  * and otherwise cleans up the database file structure.
591  */
592  try {
593  ibrcommon::RWLock l(storage._global_lock);
594  storage._database.vacuum();
595  } catch (const ibrcommon::Exception &ex) {
596  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
597  }
598 
599  // here we can do some IDLE stuff...
600  ibrcommon::Thread::sleep(1000);
601 
602  ibrcommon::MutexLock l(TaskIdle::_mutex);
603  if (!TaskIdle::_idle) return;
604  }
605  }
606 
607  const std::string SQLiteBundleStorage::getName() const
608  {
609  return "SQLiteBundleStorage";
610  }
611 
613  {
614  try {
615  ibrcommon::MutexLock l(_global_lock);
616 
617  // custody is successful transferred to another node.
618  // it is safe to delete this bundle now. (depending on the routing algorithm.)
619  // update the custodian of this bundle with the new one
620  _database.update(SQLiteDatabase::UPDATE_CUSTODIAN, id, custodian);
621  } catch (const ibrcommon::Exception &ex) {
622  IBRCOMMON_LOGGER_TAG(SQLiteBundleStorage::TAG, critical) << ex.what() << IBRCOMMON_LOGGER_ENDL;
623  }
624  }
625 
627  {
628  // raise bundle added event
629  eventBundleAdded(bundle);
630 
631  // allocate consumed space of the bundle
632  allocSpace(size);
633  }
634 
636  {
637  // raise bundle removed event
638  eventBundleRemoved(id);
639 
640  // release consumed space of this bundle
641  freeSpace(size);
642  }
643 
645  {
646  _tasks.wait(ibrcommon::Queue<Task*>::QUEUE_EMPTY);
647  }
648 
650  {
651  _faulty = mode;
652  _database.setFaulty(mode);
653  }
654  }
655 }
std::string toString() const
Definition: BundleID.cpp:190
void releaseCustody(const dtn::data::EID &custodian, const dtn::data::BundleID &id)
static void add(EventReceiver< E > *receiver)
std::set< dtn::data::EID > eid_set
Definition: BundleSeeker.h:39
dtn::data::Length size() const
void raiseEvent(const dtn::core::TimeEvent &evt)
dtn::data::Bundle get(const dtn::data::BundleID &id)
virtual Length getLength(const dtn::data::Bundle &obj)
Definition: Serializer.cpp:382
void addSeconds(const dtn::data::Number &value)
Definition: AgeBlock.cpp:71
bool contains(const dtn::data::BundleID &id)
size_t Length
Definition: Number.h:33
virtual const eid_set getDistinctDestinations()
const dtn::data::EID acceptCustody(const dtn::data::MetaBundle &meta)
static void remove(const EventReceiver< E > *receiver)
SQLiteBundleStorage(const ibrcommon::File &path, const dtn::data::Length &maxsize, bool usePersistentBundleSets=false)
dtn::data::Block & readBlock()
Definition: Serializer.cpp:906
iterator begin()
Definition: Bundle.cpp:49
void freeSpace(const dtn::data::Length &size)
void remove(const dtn::data::BundleID &id)
void update(UPDATE_VALUES, const dtn::data::BundleID &id, const dtn::data::EID &)
void allocSpace(const dtn::data::Length &size)
void store(const dtn::data::Bundle &bundle, const dtn::data::Length &size)
void eventBundleAdded(const dtn::data::MetaBundle &b)
block_list::const_iterator const_iterator
Definition: Bundle.h:77
dtn::data::Size count() const
dtn::data::Length remove(const dtn::data::BundleID &id)
std::pair< int, const ibrcommon::File > blocklist_entry
static void setFactory(dtn::data::BundleSet::Factory *)
Definition: BundleSet.cpp:34
void expire(const dtn::data::Timestamp &timestamp)
virtual const eid_set getDistinctDestinations()
void iterateDatabase(const dtn::data::MetaBundle &bundle, const dtn::data::Length size)
void eventBundleExpired(const dtn::data::BundleID &id, const dtn::data::Length size)
static const dtn::data::block_t BLOCK_TYPE
Definition: PayloadBlock.h:38
void eventBundleRemoved(const dtn::data::BundleID &id)
size_t Size
Definition: Number.h:34
bool contains(const dtn::data::BundleID &id)
virtual dtn::data::MetaBundle info(const dtn::data::BundleID &id)
T & push_back()
Definition: Bundle.h:180
static MetaBundle create(const dtn::data::BundleID &id)
Definition: MetaBundle.cpp:34
ibrcommon::BLOB::Reference getBLOB() const
virtual void get(const BundleSelector &cb, BundleResult &result)
const block_t & getType() const
Definition: Block.h:73
iterator end()
Definition: Bundle.cpp:54
ibrcommon::BLOB::Reference create()
void store(const dtn::data::Bundle &bundle)
std::list< std::pair< int, const ibrcommon::File > > blocklist