IBR-DTN  1.0.0
DataStorage.cpp
Go to the documentation of this file.
1 /*
2  * DataStorage.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "storage/DataStorage.h"
23 #include <typeinfo>
24 #include <sstream>
25 #include <iomanip>
26 #include <list>
27 
28 #include <string.h>
29 #include <stdlib.h>
30 #include <iostream>
31 #include <fstream>
32 #include <cstring>
33 #include <cerrno>
34 
35 namespace dtn
36 {
37  namespace storage
38  {
40  : value("this-hash-value-is-empty")
41  {}
42 
43  DataStorage::Hash::Hash(const std::string &v)
44  : value(v)
45  {
46  }
47 
49  : value(container.getId())
50  { }
51 
52  DataStorage::Hash::Hash(const ibrcommon::File &file) : value(file.getBasename()) {}
54 
56  {
57  return (value != other.value);
58  }
59 
61  {
62  return (value == other.value);
63  }
64 
66  {
67  return (value < other.value);
68  }
69 
70  DataStorage::istream::istream(ibrcommon::Mutex &mutex, const ibrcommon::File &file)
71  : ibrcommon::File(file), _stream(NULL), _lock(mutex)
72  {
73  _lock.enter();
74  _stream = new std::ifstream(getPath().c_str(), ios_base::in | ios_base::binary);
75  }
76 
78  {
79  if (_stream != NULL)
80  {
81  delete _stream;
82  _lock.leave();
83  }
84  }
85 
87  { return *_stream; }
88 
89  DataStorage::DataStorage(Callback &callback, const ibrcommon::File &path, unsigned int write_buffer, bool initialize)
90  : _callback(callback), _path(path), _tasks(), _store_sem(write_buffer), _store_limited(write_buffer > 0), _faulty(false)
91  // limit the number of bundles in the write buffer
92  {
93  // initialize the storage
94  if (initialize)
95  {
96  if (_path.exists())
97  {
98  // remove all files in the path
99  std::list<ibrcommon::File> files;
100  _path.getFiles(files);
101 
102  for (std::list<ibrcommon::File>::iterator iter = files.begin(); iter != files.end(); ++iter)
103  {
104  (*iter).remove(true);
105  }
106  }
107  else
108  {
109  // create the path
110  ibrcommon::File::createDirectory(_path);
111  }
112  }
113  }
114 
116  {
117  _tasks.abort();
118  join();
119 
120  // delete all task objects
121  try {
122  while (true)
123  {
124  Task *t = _tasks.take();
125  delete t;
126  }
127  } catch (const ibrcommon::QueueUnblockedException&) {
128  // exit
129  }
130  }
131 
133  {
134  JoinableThread::reset();
135  }
136 
137  void DataStorage::setFaulty(bool mode)
138  {
139  _faulty = mode;
140  }
141 
143  {
144  std::list<ibrcommon::File> files;
145  _path.getFiles(files);
146 
147  for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); ++iter)
148  {
149  if (!(*iter).isSystem() && !(*iter).isDirectory())
150  {
151  DataStorage::Hash hash(*iter);
152  DataStorage::istream stream(_global_mutex, *iter);
153 
154  _callback.iterateDataStorage(hash, stream);
155  }
156  }
157  }
158 
160  {
161  // wait for resources
162  if (_store_limited) _store_sem.wait();
163 
164  // put the task into the queue
165  _tasks.push( new StoreDataTask(hash, data) );
166  }
167 
169  {
170  // create a corresponding hash
171  DataStorage::Hash hash(*data);
172  store(hash, data);
173  return hash;
174  }
175 
177  {
178  ibrcommon::File file = _path.get(hash.value);
179 
180  if (!file.exists())
181  {
182  throw DataNotAvailableException("file " + file.getPath() + " not found");
183  }
184 
185  return DataStorage::istream(_global_mutex, file);
186  }
187 
189  {
190  _tasks.push( new RemoveDataTask(hash) );
191  }
192 
194  {
195  _tasks.wait(ibrcommon::Queue< Task* >::QUEUE_EMPTY);
196  }
197 
199  {
200  _tasks.abort();
201  }
202 
203  void DataStorage::run() throw ()
204  {
205  try {
206  while (true)
207  {
208  _tasks.wait(ibrcommon::Queue<Task*>::QUEUE_NOT_EMPTY);
209  Task *t = _tasks.front();
210 
211  try {
212  StoreDataTask &store = dynamic_cast<StoreDataTask&>(*t);
213 
214  try {
215  ibrcommon::File destination = _path.get(store.hash.value);
216 
217  {
218  ibrcommon::MutexLock l(_global_mutex);
219  std::ofstream stream(destination.getPath().c_str(), ios::out | ios::binary | ios::trunc);
220 
221  // check the streams health
222  if (!stream.good() || _faulty)
223  {
224  std::stringstream ss; ss << "unable to open filestream [" << std::strerror(errno) << "]";
225  throw ibrcommon::IOException(ss.str());
226  }
227 
228  store._container->serialize(stream);
229  stream.close();
230  }
231 
232  // release resources
233  if (_store_limited) _store_sem.post();
234 
235  // notify the stored item
236  _callback.eventDataStorageStored(store.hash);
237  } catch (const ibrcommon::Exception &ex) {
238  // release resources
239  if (_store_limited) _store_sem.post();
240 
241  // notify the fail of store action
242  _callback.eventDataStorageStoreFailed(store.hash, ex);
243  }
244  } catch (const std::bad_cast&) {
245  }
246 
247  try {
248  RemoveDataTask &remove = dynamic_cast<RemoveDataTask&>(*t);
249 
250  try {
251  ibrcommon::File destination = _path.get(remove.hash.value);
252  {
253  ibrcommon::MutexLock l(_global_mutex);
254  if (!destination.exists())
255  {
257  }
258  destination.remove();
259  }
260  _callback.eventDataStorageRemoved(remove.hash);
261  } catch (const ibrcommon::Exception &ex) {
262  _callback.eventDataStorageRemoveFailed(remove.hash, ex);
263  }
264  } catch (const std::bad_cast&) {
265 
266  }
267 
268  delete t;
269  _tasks.pop();
270  }
271  } catch (const ibrcommon::QueueUnblockedException&) {
272  // exit
273  }
274  }
275 
277  DataStorage::Task::~Task() {}
278 
279  DataStorage::StoreDataTask::StoreDataTask(const Hash &h, Container *c)
280  : hash(h), _container(c)
281  {}
282 
283  DataStorage::StoreDataTask::~StoreDataTask()
284  {
285  }
286 
287  DataStorage::RemoveDataTask::RemoveDataTask(const Hash &h) : hash(h)
288  {}
289 
290  DataStorage::RemoveDataTask::~RemoveDataTask()
291  {
292  }
293  }
294 }
virtual void eventDataStorageRemoveFailed(const Hash &hash, const ibrcommon::Exception &)=0
istream(ibrcommon::Mutex &mutex, const ibrcommon::File &file)
Definition: DataStorage.cpp:70
void setFaulty(bool mode)
DataStorage(Callback &callback, const ibrcommon::File &path, unsigned int write_buffer=0, bool initialize=false)
Definition: DataStorage.cpp:89
int h
Definition: dtnrecv.cpp:53
bool operator<(const Hash &other) const
Definition: DataStorage.cpp:65
virtual void eventDataStorageRemoved(const Hash &hash)=0
bool operator==(const Hash &other) const
Definition: DataStorage.cpp:60
bool operator!=(const Hash &other) const
Definition: DataStorage.cpp:55
DataStorage::istream retrieve(const Hash &hash)
virtual void iterateDataStorage(const Hash &hash, DataStorage::istream &stream)=0
const Hash store(Container *data)
virtual void eventDataStorageStored(const Hash &hash)=0
void remove(const Hash &hash)
virtual void eventDataStorageStoreFailed(const Hash &hash, const ibrcommon::Exception &)=0