|
IBR-DTNSuite 0.6
|
00001 /* 00002 * DataStorage.cpp 00003 * 00004 * Created on: 22.11.2010 00005 * Author: morgenro 00006 */ 00007 00008 #include "core/DataStorage.h" 00009 #include <typeinfo> 00010 #include <sstream> 00011 #include <iomanip> 00012 #include <list> 00013 00014 #include <string.h> 00015 #include <stdlib.h> 00016 #include <iostream> 00017 #include <fstream> 00018 #include <cstring> 00019 #include <cerrno> 00020 00021 namespace dtn 00022 { 00023 namespace core 00024 { 00025 DataStorage::Hash::Hash() 00026 : value("this-hash-value-is-empty") 00027 {} 00028 00029 DataStorage::Hash::Hash(const std::string &key) 00030 : value(DataStorage::Hash::hash(key)) 00031 { } 00032 00033 DataStorage::Hash::Hash(const DataStorage::Container &container) 00034 : value(DataStorage::Hash::hash(container.getKey())) 00035 { }; 00036 00037 DataStorage::Hash::Hash(const ibrcommon::File &file) : value(file.getBasename()) {}; 00038 DataStorage::Hash::~Hash() {}; 00039 00040 bool DataStorage::Hash::operator==(const DataStorage::Hash &other) const 00041 { 00042 return (value == other.value); 00043 } 00044 00045 bool DataStorage::Hash::operator<(const DataStorage::Hash &other) const 00046 { 00047 return (value < other.value); 00048 } 00049 00050 std::string DataStorage::Hash::hash(const std::string &value) 00051 { 00052 std::stringstream ss; 00053 for (std::string::const_iterator iter = value.begin(); iter != value.end(); iter++) 00054 { 00055 ss << std::hex << std::setw( 2 ) << std::setfill( '0' ) << (int)(*iter); 00056 } 00057 return ss.str(); 00058 } 00059 00060 DataStorage::istream::istream(ibrcommon::Mutex &mutex, const ibrcommon::File &file) 00061 : ibrcommon::File(file), _stream(NULL), _lock(mutex) 00062 { 00063 _lock.enter(); 00064 _stream = new std::ifstream(getPath().c_str(), ios_base::in | ios_base::binary); 00065 }; 00066 00067 DataStorage::istream::~istream() 00068 { 00069 if (_stream != NULL) 00070 { 00071 delete _stream; 00072 _lock.leave(); 00073 } 00074 }; 00075 00076 std::istream& DataStorage::istream::operator*() 00077 { return *_stream; } 00078 00079 DataStorage::DataStorage(Callback &callback, const ibrcommon::File &path, size_t write_buffer, bool initialize) 00080 : _callback(callback), _path(path), _tasks(), _store_sem(write_buffer), _store_limited(write_buffer > 0) 00081 // limit the number of bundles in the write buffer 00082 { 00083 // initialize the storage 00084 if (initialize) 00085 { 00086 if (_path.exists()) 00087 { 00088 // remove all files in the path 00089 std::list<ibrcommon::File> files; 00090 _path.getFiles(files); 00091 00092 for (std::list<ibrcommon::File>::iterator iter = files.begin(); iter != files.end(); iter++) 00093 { 00094 (*iter).remove(true); 00095 } 00096 } 00097 else 00098 { 00099 // create the path 00100 ibrcommon::File::createDirectory(_path); 00101 } 00102 } 00103 } 00104 00105 DataStorage::~DataStorage() 00106 { 00107 _tasks.abort(); 00108 join(); 00109 00110 // delete all task objects 00111 try { 00112 while (true) 00113 { 00114 Task *t = _tasks.getnpop(false); 00115 delete t; 00116 } 00117 } catch (const ibrcommon::QueueUnblockedException&) { 00118 // exit 00119 } 00120 } 00121 00122 void DataStorage::iterateAll() 00123 { 00124 std::list<ibrcommon::File> files; 00125 _path.getFiles(files); 00126 00127 for (std::list<ibrcommon::File>::const_iterator iter = files.begin(); iter != files.end(); iter++) 00128 { 00129 if (!(*iter).isSystem()) 00130 { 00131 DataStorage::Hash hash(*iter); 00132 DataStorage::istream stream(_global_mutex, *iter); 00133 00134 _callback.iterateDataStorage(hash, stream); 00135 } 00136 } 00137 } 00138 00139 void DataStorage::store(const DataStorage::Hash &hash, DataStorage::Container *data) 00140 { 00141 // wait for resources 00142 if (_store_limited) _store_sem.wait(); 00143 00144 // put the task into the queue 00145 _tasks.push( new StoreDataTask(hash, data) ); 00146 } 00147 00148 const DataStorage::Hash DataStorage::store(DataStorage::Container *data) 00149 { 00150 // create a corresponding hash 00151 DataStorage::Hash hash(*data); 00152 store(hash, data); 00153 return hash; 00154 } 00155 00156 DataStorage::istream DataStorage::retrieve(const DataStorage::Hash &hash) throw (DataNotAvailableException) 00157 { 00158 ibrcommon::File file = _path.get(hash.value); 00159 00160 if (!file.exists()) 00161 { 00162 throw DataNotAvailableException(); 00163 } 00164 00165 return DataStorage::istream(_global_mutex, file); 00166 } 00167 00168 void DataStorage::remove(const DataStorage::Hash &hash) 00169 { 00170 _tasks.push( new RemoveDataTask(hash) ); 00171 } 00172 00173 bool DataStorage::__cancellation() 00174 { 00175 _tasks.abort(); 00176 return true; 00177 } 00178 00179 void DataStorage::run() 00180 { 00181 try { 00182 while (true) 00183 { 00184 Task *t = _tasks.getnpop(true); 00185 00186 try { 00187 StoreDataTask &store = dynamic_cast<StoreDataTask&>(*t); 00188 00189 try { 00190 ibrcommon::File destination = _path.get(store.hash.value); 00191 00192 { 00193 ibrcommon::MutexLock l(_global_mutex); 00194 std::ofstream stream(destination.getPath().c_str(), ios::out | ios::binary | ios::trunc); 00195 00196 // check the streams health 00197 if (!stream.good()) 00198 { 00199 std::stringstream ss; ss << "unable to open filestream [" << std::strerror(errno) << "]"; 00200 throw ibrcommon::IOException(ss.str()); 00201 } 00202 00203 store.container->serialize(stream); 00204 stream.close(); 00205 } 00206 00207 // release resources 00208 if (_store_limited) _store_sem.post(); 00209 00210 // notify the stored item 00211 _callback.eventDataStorageStored(store.hash); 00212 } catch (const ibrcommon::Exception &ex) { 00213 // release resources 00214 if (_store_limited) _store_sem.post(); 00215 00216 // notify the fail of store action 00217 _callback.eventDataStorageStoreFailed(store.hash, ex); 00218 } 00219 } catch (const std::bad_cast&) { 00220 } 00221 00222 try { 00223 RemoveDataTask &remove = dynamic_cast<RemoveDataTask&>(*t); 00224 00225 try { 00226 ibrcommon::File destination = _path.get(remove.hash.value); 00227 { 00228 ibrcommon::MutexLock l(_global_mutex); 00229 if (!destination.exists()) 00230 { 00231 throw DataNotAvailableException(); 00232 } 00233 destination.remove(); 00234 } 00235 _callback.eventDataStorageRemoved(remove.hash); 00236 } catch (const ibrcommon::Exception &ex) { 00237 _callback.eventDataStorageRemoveFailed(remove.hash, ex); 00238 } 00239 } catch (const std::bad_cast&) { 00240 00241 } 00242 00243 delete t; 00244 } 00245 } catch (const ibrcommon::QueueUnblockedException&) { 00246 // exit 00247 } 00248 } 00249 00250 DataStorage::Container::~Container() {}; 00251 DataStorage::Task::~Task() {}; 00252 00253 DataStorage::StoreDataTask::StoreDataTask(const Hash &h, Container *c) 00254 : hash(h), container(c) 00255 {} 00256 00257 DataStorage::StoreDataTask::~StoreDataTask() 00258 { 00259 delete container; 00260 } 00261 00262 DataStorage::RemoveDataTask::RemoveDataTask(const Hash &h) : hash(h) 00263 {} 00264 00265 DataStorage::RemoveDataTask::~RemoveDataTask() 00266 { 00267 } 00268 } 00269 }