|
IBR-DTNSuite 0.6
|
00001 /* 00002 * Registration.cpp 00003 * 00004 * Created on: 15.06.2011 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "api/Registration.h" 00010 #include "core/BundleStorage.h" 00011 #include "core/BundleCore.h" 00012 #include "core/BundleEvent.h" 00013 00014 #ifdef HAVE_SQLITE 00015 #include "core/SQLiteBundleStorage.h" 00016 #endif 00017 00018 #include <ibrdtn/utils/Clock.h> 00019 #include <ibrdtn/utils/Random.h> 00020 #include <ibrcommon/Logger.h> 00021 00022 #include <limits.h> 00023 #include <stdint.h> 00024 00025 namespace dtn 00026 { 00027 namespace api 00028 { 00029 std::set<std::string> Registration::_handles; 00030 00031 const std::string Registration::alloc_handle() 00032 { 00033 static dtn::utils::Random rand; 00034 00035 std::string new_handle = rand.gen_chars(16); 00036 00037 while (_handles.find(new_handle) != _handles.end()) 00038 { 00039 new_handle = rand.gen_chars(16); 00040 } 00041 00042 Registration::_handles.insert(new_handle); 00043 00044 return new_handle; 00045 } 00046 00047 void Registration::free_handle(const std::string &handle) 00048 { 00049 Registration::_handles.erase(handle); 00050 } 00051 00052 Registration::Registration() : _handle(alloc_handle()) 00053 { 00054 } 00055 00056 Registration::~Registration() 00057 { 00058 free_handle(_handle); 00059 } 00060 00061 void Registration::notify(const NOTIFY_CALL call) 00062 { 00063 ibrcommon::MutexLock l(_wait_for_cond); 00064 if (call == NOTIFY_BUNDLE_AVAILABLE) 00065 { 00066 _no_more_bundles = false; 00067 _wait_for_cond.signal(true); 00068 } 00069 else 00070 { 00071 _notify_queue.push(call); 00072 } 00073 } 00074 00075 void Registration::wait_for_bundle() 00076 { 00077 ibrcommon::MutexLock l(_wait_for_cond); 00078 00079 while (_no_more_bundles) 00080 { 00081 _wait_for_cond.wait(); 00082 } 00083 } 00084 00085 Registration::NOTIFY_CALL Registration::wait() 00086 { 00087 return _notify_queue.getnpop(true); 00088 } 00089 00090 bool Registration::hasSubscribed(const dtn::data::EID &endpoint) const 00091 { 00092 return (_endpoints.find(endpoint) != _endpoints.end()); 00093 } 00094 00095 const std::set<dtn::data::EID>& Registration::getSubscriptions() const 00096 { 00097 return _endpoints; 00098 } 00099 00100 void Registration::delivered(const dtn::data::MetaBundle &m) 00101 { 00102 // raise bundle event 00103 dtn::core::BundleEvent::raise(m, dtn::core::BUNDLE_DELIVERED); 00104 00105 if (m.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON)) 00106 { 00107 // get the global storage 00108 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00109 00110 // delete the bundle 00111 storage.remove(m); 00112 } 00113 } 00114 00115 dtn::data::Bundle Registration::receive() throw (dtn::core::BundleStorage::NoBundleFoundException) 00116 { 00117 ibrcommon::MutexLock l(_receive_lock); 00118 00119 // get the global storage 00120 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00121 00122 while (true) 00123 { 00124 try { 00125 // get the first bundle in the queue 00126 dtn::data::MetaBundle b = _queue.getnpop(false); 00127 00128 // load the bundle 00129 return storage.get(b); 00130 } catch (const ibrcommon::QueueUnblockedException &e) { 00131 if (e.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT) 00132 { 00133 // query for new bundles 00134 underflow(); 00135 } 00136 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { } 00137 } 00138 00139 throw dtn::core::BundleStorage::NoBundleFoundException(); 00140 } 00141 00142 void Registration::underflow() 00143 { 00144 // expire outdated bundles in the list 00145 _received_bundles.expire(dtn::utils::Clock::getTime()); 00146 00150 #ifdef HAVE_SQLITE 00151 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery 00152 #else 00153 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback 00154 #endif 00155 { 00156 public: 00157 BundleFilter(const std::set<dtn::data::EID> endpoints, const dtn::data::BundleList &blist) 00158 : _endpoints(endpoints), _blist(blist) 00159 {}; 00160 00161 virtual ~BundleFilter() {}; 00162 00163 virtual size_t limit() const { return 10; }; 00164 00165 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00166 { 00167 if (_endpoints.find(meta.destination) == _endpoints.end()) 00168 { 00169 return false; 00170 } 00171 00172 IBRCOMMON_LOGGER_DEBUG(10) << "search bundle in the list of delivered bundles: " << meta.toString() << IBRCOMMON_LOGGER_ENDL; 00173 00174 if (_blist.contains(meta)) 00175 { 00176 return false; 00177 } 00178 00179 return true; 00180 }; 00181 00182 #ifdef HAVE_SQLITE 00183 const std::string getWhere() const 00184 { 00185 if (_endpoints.size() > 1) 00186 { 00187 std::string where = "("; 00188 00189 for (size_t i = _endpoints.size() - 1; i > 0; i--) 00190 { 00191 where += "destination = ? OR "; 00192 } 00193 00194 return where + "destination = ?)"; 00195 } 00196 else if (_endpoints.size() == 1) 00197 { 00198 return "destination = ?"; 00199 } 00200 else 00201 { 00202 return "destination = null"; 00203 } 00204 }; 00205 00206 size_t bind(sqlite3_stmt *st, size_t offset) const 00207 { 00208 size_t o = offset; 00209 00210 for (std::set<dtn::data::EID>::const_iterator iter = _endpoints.begin(); iter != _endpoints.end(); iter++) 00211 { 00212 const std::string data = (*iter).getString(); 00213 00214 sqlite3_bind_text(st, o, data.c_str(), data.size(), SQLITE_TRANSIENT); 00215 o++; 00216 } 00217 00218 return o; 00219 } 00220 #endif 00221 00222 private: 00223 const std::set<dtn::data::EID> _endpoints; 00224 const dtn::data::BundleList &_blist; 00225 } filter(_endpoints, _received_bundles); 00226 00227 // get the global storage 00228 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00229 00230 // query the database for more bundles 00231 const std::list<dtn::data::MetaBundle> list = storage.get( filter ); 00232 00233 if (list.size() == 0) 00234 { 00235 ibrcommon::MutexLock l(_wait_for_cond); 00236 _no_more_bundles = true; 00237 throw dtn::core::BundleStorage::NoBundleFoundException(); 00238 } 00239 00240 try { 00241 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00242 { 00243 _queue.push(*iter); 00244 00245 IBRCOMMON_LOGGER_DEBUG(10) << "add bundle to list of delivered bundles: " << (*iter).toString() << IBRCOMMON_LOGGER_ENDL; 00246 _received_bundles.add(*iter); 00247 } 00248 } catch (const ibrcommon::Exception&) { } 00249 } 00250 00251 void Registration::subscribe(const dtn::data::EID &endpoint) 00252 { 00253 _endpoints.insert(endpoint); 00254 } 00255 00256 void Registration::unsubscribe(const dtn::data::EID &endpoint) 00257 { 00258 _endpoints.erase(endpoint); 00259 } 00260 00264 bool Registration::operator==(const std::string &other) const 00265 { 00266 return (_handle == other); 00267 } 00268 00272 bool Registration::operator==(const Registration &other) const 00273 { 00274 return (_handle == other._handle); 00275 } 00276 00280 bool Registration::operator<(const Registration &other) const 00281 { 00282 return (_handle < other._handle); 00283 } 00284 00285 void Registration::abort() 00286 { 00287 _queue.abort(); 00288 _notify_queue.abort(); 00289 00290 ibrcommon::MutexLock l(_wait_for_cond); 00291 _wait_for_cond.abort(); 00292 } 00293 00294 const std::string& Registration::getHandle() const 00295 { 00296 return _handle; 00297 } 00298 } 00299 }