Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "core/DataStorage.h"
00009 #include <typeinfo>
00010 #include <sstream>
00011 #include <iomanip>
00012 #include <list>
00013
00014 #include <string.h>
00015 #include <stdlib.h>
00016 #include <iostream>
00017 #include <fstream>
00018 #include <cstring>
00019 #include <cerrno>
00020
00021 namespace dtn
00022 {
00023 namespace core
00024 {
00025 DataStorage::Hash::Hash()
00026 : value("this-hash-value-is-empty")
00027 {}
00028
00029 DataStorage::Hash::Hash(const std::string &key)
00030 : value(DataStorage::Hash::hash(key))
00031 { }
00032
00033 DataStorage::Hash::Hash(const DataStorage::Container &container)
00034 : value(DataStorage::Hash::hash(container.getKey()))
00035 { };
00036
00037 DataStorage::Hash::Hash(const ibrcommon::File &file) : value(file.getBasename()) {};
00038 DataStorage::Hash::~Hash() {};
00039
00040 bool DataStorage::Hash::operator==(const DataStorage::Hash &other) const
00041 {
00042 return (value == other.value);
00043 }
00044
00045 bool DataStorage::Hash::operator<(const DataStorage::Hash &other) const
00046 {
00047 return (value < other.value);
00048 }
00049
00050 std::string DataStorage::Hash::hash(const std::string &value)
00051 {
00052 std::stringstream ss;
00053 for (std::string::const_iterator iter = value.begin(); iter != value.end(); iter++)
00054 {
00055 ss << std::hex << std::setw( 2 ) << std::setfill( '0' ) << (int)(*iter);
00056 }
00057 return ss.str();
00058 }
00059
00060 DataStorage::istream::istream(ibrcommon::Mutex &mutex, const ibrcommon::File &file)
00061 : ibrcommon::File(file), _stream(NULL), _lock(mutex)
00062 {
00063 _lock.enter();
00064 _stream = new std::ifstream(getPath().c_str(), ios_base::in | ios_base::binary);
00065 };
00066
00067 DataStorage::istream::~istream()
00068 {
00069 if (_stream != NULL)
00070 {
00071 delete _stream;
00072 _lock.leave();
00073 }
00074 };
00075
00076 std::istream& DataStorage::istream::operator*()
00077 { return *_stream; }
00078
00079 DataStorage::DataStorage(Callback &callback, const ibrcommon::File &path, size_t write_buffer, bool initialize)
00080 : _callback(callback), _path(path), _tasks(), _store_sem(write_buffer), _store_limited(write_buffer > 0)
00081
00082 {
00083
00084 if (initialize)
00085 {
00086 if (_path.exists())
00087 {
00088
00089 std::list<ibrcommon::File> files;
00090 _path.getFiles(files);
00091
00092 for (std::list<ibrcommon::File>::iterator iter = files.begin(); iter != files.end(); iter++)
00093 {
00094 (*iter).remove(true);
00095 }
00096 }
00097 else
00098 {
00099
00100 ibrcommon::File::createDirectory(_path);
00101 }
00102 }
00103 }
00104
00105 DataStorage::~DataStorage()
00106 {
00107 _tasks.abort();
00108 join();
00109
00110
00111 try {
00112 while (true)
00113 {
00114 Task *t = _tasks.getnpop(false);
00115 delete t;
00116 }
00117 } catch (const ibrcommon::QueueUnblockedException&) {
00118
00119 }
00120 }
00121
00122 void DataStorage::iterateAll()
00123 {
00124 std::list<ibrcommon::File> files;
00125 _path.getFiles(files);
00126
00127 for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++)
00128 {
00129 if (!(*iter).isSystem())
00130 {
00131 DataStorage::Hash hash(*iter);
00132 DataStorage::istream stream(_global_mutex, *iter);
00133
00134 _callback.iterateDataStorage(hash, stream);
00135 }
00136 }
00137 }
00138
00139 void DataStorage::store(const DataStorage::Hash &hash, DataStorage::Container *data)
00140 {
00141
00142 if (_store_limited) _store_sem.wait();
00143
00144
00145 _tasks.push( new StoreDataTask(hash, data) );
00146 }
00147
00148 const DataStorage::Hash DataStorage::store(DataStorage::Container *data)
00149 {
00150
00151 DataStorage::Hash hash(*data);
00152 store(hash, data);
00153 return hash;
00154 }
00155
00156 DataStorage::istream DataStorage::retrieve(const DataStorage::Hash &hash) throw (DataNotAvailableException)
00157 {
00158 ibrcommon::File file = _path.get(hash.value);
00159
00160 if (!file.exists())
00161 {
00162 throw DataNotAvailableException();
00163 }
00164
00165 return DataStorage::istream(_global_mutex, file);
00166 }
00167
00168 void DataStorage::remove(const DataStorage::Hash &hash)
00169 {
00170 _tasks.push( new RemoveDataTask(hash) );
00171 }
00172
00173 bool DataStorage::__cancellation()
00174 {
00175 _tasks.abort();
00176 return true;
00177 }
00178
00179 void DataStorage::run()
00180 {
00181 try {
00182 while (true)
00183 {
00184 Task *t = _tasks.getnpop(true);
00185
00186 try {
00187 StoreDataTask &store = dynamic_cast<StoreDataTask&>(*t);
00188
00189 try {
00190 ibrcommon::File destination = _path.get(store.hash.value);
00191
00192 {
00193 ibrcommon::MutexLock l(_global_mutex);
00194 std::ofstream stream(destination.getPath().c_str(), ios::out | ios::binary | ios::trunc);
00195
00196
00197 if (!stream.good())
00198 {
00199 std::stringstream ss; ss << "unable to open filestream [" << std::strerror(errno) << "]";
00200 throw ibrcommon::IOException(ss.str());
00201 }
00202
00203 store.container->serialize(stream);
00204 stream.close();
00205 }
00206
00207
00208 if (_store_limited) _store_sem.post();
00209
00210
00211 _callback.eventDataStorageStored(store.hash);
00212 } catch (const ibrcommon::Exception &ex) {
00213
00214 if (_store_limited) _store_sem.post();
00215
00216
00217 _callback.eventDataStorageStoreFailed(store.hash, ex);
00218 }
00219 } catch (const std::bad_cast&) {
00220 }
00221
00222 try {
00223 RemoveDataTask &remove = dynamic_cast<RemoveDataTask&>(*t);
00224
00225 try {
00226 ibrcommon::File destination = _path.get(remove.hash.value);
00227 {
00228 ibrcommon::MutexLock l(_global_mutex);
00229 if (!destination.exists())
00230 {
00231 throw DataNotAvailableException();
00232 }
00233 destination.remove();
00234 }
00235 _callback.eventDataStorageRemoved(remove.hash);
00236 } catch (const ibrcommon::Exception &ex) {
00237 _callback.eventDataStorageRemoveFailed(remove.hash, ex);
00238 }
00239 } catch (const std::bad_cast&) {
00240
00241 }
00242
00243 delete t;
00244 }
00245 } catch (const ibrcommon::QueueUnblockedException&) {
00246
00247 }
00248 }
00249
00250 DataStorage::Container::~Container() {};
00251 DataStorage::Task::~Task() {};
00252
00253 DataStorage::StoreDataTask::StoreDataTask(const Hash &h, Container *c)
00254 : hash(h), container(c)
00255 {}
00256
00257 DataStorage::StoreDataTask::~StoreDataTask()
00258 {
00259 delete container;
00260 }
00261
00262 DataStorage::RemoveDataTask::RemoveDataTask(const Hash &h) : hash(h)
00263 {}
00264
00265 DataStorage::RemoveDataTask::~RemoveDataTask()
00266 {
00267 }
00268 }
00269 }