IBR-DTNSuite 0.6

daemon/src/api/Registration.cpp

Go to the documentation of this file.
00001 /*
00002  * Registration.cpp
00003  *
00004  *  Created on: 15.06.2011
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "api/Registration.h"
00010 #include "core/BundleStorage.h"
00011 #include "core/BundleCore.h"
00012 #include "core/BundleEvent.h"
00013 
00014 #ifdef HAVE_SQLITE
00015 #include "core/SQLiteBundleStorage.h"
00016 #endif
00017 
00018 #include <ibrdtn/utils/Clock.h>
00019 #include <ibrdtn/utils/Random.h>
00020 #include <ibrcommon/Logger.h>
00021 
00022 #include <limits.h>
00023 #include <stdint.h>
00024 
00025 namespace dtn
00026 {
00027         namespace api
00028         {
00029                 std::set<std::string> Registration::_handles;
00030 
00031                 const std::string Registration::alloc_handle()
00032                 {
00033                         static dtn::utils::Random rand;
00034 
00035                         std::string new_handle = rand.gen_chars(16);
00036 
00037                         while (_handles.find(new_handle) != _handles.end())
00038                         {
00039                                 new_handle = rand.gen_chars(16);
00040                         }
00041 
00042                         Registration::_handles.insert(new_handle);
00043 
00044                         return new_handle;
00045                 }
00046 
00047                 void Registration::free_handle(const std::string &handle)
00048                 {
00049                         Registration::_handles.erase(handle);
00050                 }
00051 
00052                 Registration::Registration() : _handle(alloc_handle())
00053                 {
00054                 }
00055 
00056                 Registration::~Registration()
00057                 {
00058                         free_handle(_handle);
00059                 }
00060 
00061                 void Registration::notify(const NOTIFY_CALL call)
00062                 {
00063                         ibrcommon::MutexLock l(_wait_for_cond);
00064                         if (call == NOTIFY_BUNDLE_AVAILABLE)
00065                         {
00066                                 _no_more_bundles = false;
00067                                 _wait_for_cond.signal(true);
00068                         }
00069                         else
00070                         {
00071                                 _notify_queue.push(call);
00072                         }
00073                 }
00074 
00075                 void Registration::wait_for_bundle()
00076                 {
00077                         ibrcommon::MutexLock l(_wait_for_cond);
00078 
00079                         while (_no_more_bundles)
00080                         {
00081                                 _wait_for_cond.wait();
00082                         }
00083                 }
00084 
00085                 Registration::NOTIFY_CALL Registration::wait()
00086                 {
00087                         return _notify_queue.getnpop(true);
00088                 }
00089 
00090                 bool Registration::hasSubscribed(const dtn::data::EID &endpoint) const
00091                 {
00092                         return (_endpoints.find(endpoint) != _endpoints.end());
00093                 }
00094 
00095                 const std::set<dtn::data::EID>& Registration::getSubscriptions() const
00096                 {
00097                         return _endpoints;
00098                 }
00099 
00100                 void Registration::delivered(const dtn::data::MetaBundle &m)
00101                 {
00102                         // raise bundle event
00103                         dtn::core::BundleEvent::raise(m, dtn::core::BUNDLE_DELIVERED);
00104 
00105                         if (m.get(dtn::data::PrimaryBlock::DESTINATION_IS_SINGLETON))
00106                         {
00107                                 // get the global storage
00108                                 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00109 
00110                                 // delete the bundle
00111                                 storage.remove(m);
00112                         }
00113                 }
00114 
00115                 dtn::data::Bundle Registration::receive() throw (dtn::core::BundleStorage::NoBundleFoundException)
00116                 {
00117                         ibrcommon::MutexLock l(_receive_lock);
00118 
00119                         // get the global storage
00120                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00121 
00122                         while (true)
00123                         {
00124                                 try {
00125                                         // get the first bundle in the queue
00126                                         dtn::data::MetaBundle b = _queue.getnpop(false);
00127 
00128                                         // load the bundle
00129                                         return storage.get(b);
00130                                 } catch (const ibrcommon::QueueUnblockedException &e) {
00131                                         if (e.reason == ibrcommon::QueueUnblockedException::QUEUE_ABORT)
00132                                         {
00133                                                 // query for new bundles
00134                                                 underflow();
00135                                         }
00136                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) { }
00137                         }
00138 
00139                         throw dtn::core::BundleStorage::NoBundleFoundException();
00140                 }
00141 
00142                 void Registration::underflow()
00143                 {
00144                         // expire outdated bundles in the list
00145                         _received_bundles.expire(dtn::utils::Clock::getTime());
00146 
00150 #ifdef HAVE_SQLITE
00151                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery
00152 #else
00153                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00154 #endif
00155                         {
00156                         public:
00157                                 BundleFilter(const std::set<dtn::data::EID> endpoints, const dtn::data::BundleList &blist)
00158                                  : _endpoints(endpoints), _blist(blist)
00159                                 {};
00160 
00161                                 virtual ~BundleFilter() {};
00162 
00163                                 virtual size_t limit() const { return 10; };
00164 
00165                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00166                                 {
00167                                         if (_endpoints.find(meta.destination) == _endpoints.end())
00168                                         {
00169                                                 return false;
00170                                         }
00171 
00172                                         IBRCOMMON_LOGGER_DEBUG(10) << "search bundle in the list of delivered bundles: " << meta.toString() << IBRCOMMON_LOGGER_ENDL;
00173 
00174                                         if (_blist.contains(meta))
00175                                         {
00176                                                 return false;
00177                                         }
00178 
00179                                         return true;
00180                                 };
00181 
00182 #ifdef HAVE_SQLITE
00183                                 const std::string getWhere() const
00184                                 {
00185                                         if (_endpoints.size() > 1)
00186                                         {
00187                                                 std::string where = "(";
00188 
00189                                                 for (size_t i = _endpoints.size() - 1; i > 0; i--)
00190                                                 {
00191                                                         where += "destination = ? OR ";
00192                                                 }
00193 
00194                                                 return where + "destination = ?)";
00195                                         }
00196                                         else if (_endpoints.size() == 1)
00197                                         {
00198                                                 return "destination = ?";
00199                                         }
00200                                         else
00201                                         {
00202                                                 return "destination = null";
00203                                         }
00204                                 };
00205 
00206                                 size_t bind(sqlite3_stmt *st, size_t offset) const
00207                                 {
00208                                         size_t o = offset;
00209 
00210                                         for (std::set<dtn::data::EID>::const_iterator iter = _endpoints.begin(); iter != _endpoints.end(); iter++)
00211                                         {
00212                                                 const std::string data = (*iter).getString();
00213 
00214                                                 sqlite3_bind_text(st, o, data.c_str(), data.size(), SQLITE_TRANSIENT);
00215                                                 o++;
00216                                         }
00217 
00218                                         return o;
00219                                 }
00220 #endif
00221 
00222                         private:
00223                                 const std::set<dtn::data::EID> _endpoints;
00224                                 const dtn::data::BundleList &_blist;
00225                         } filter(_endpoints, _received_bundles);
00226 
00227                         // get the global storage
00228                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00229 
00230                         // query the database for more bundles
00231                         const std::list<dtn::data::MetaBundle> list = storage.get( filter );
00232 
00233                         if (list.size() == 0)
00234                         {
00235                                 ibrcommon::MutexLock l(_wait_for_cond);
00236                                 _no_more_bundles = true;
00237                                 throw dtn::core::BundleStorage::NoBundleFoundException();
00238                         }
00239 
00240                         try {
00241                                 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00242                                 {
00243                                         _queue.push(*iter);
00244 
00245                                         IBRCOMMON_LOGGER_DEBUG(10) << "add bundle to list of delivered bundles: " << (*iter).toString() << IBRCOMMON_LOGGER_ENDL;
00246                                         _received_bundles.add(*iter);
00247                                 }
00248                         } catch (const ibrcommon::Exception&) { }
00249                 }
00250 
00251                 void Registration::subscribe(const dtn::data::EID &endpoint)
00252                 {
00253                         _endpoints.insert(endpoint);
00254                 }
00255 
00256                 void Registration::unsubscribe(const dtn::data::EID &endpoint)
00257                 {
00258                         _endpoints.erase(endpoint);
00259                 }
00260 
00264                 bool Registration::operator==(const std::string &other) const
00265                 {
00266                         return (_handle == other);
00267                 }
00268 
00272                 bool Registration::operator==(const Registration &other) const
00273                 {
00274                         return (_handle == other._handle);
00275                 }
00276 
00280                 bool Registration::operator<(const Registration &other) const
00281                 {
00282                         return (_handle < other._handle);
00283                 }
00284 
00285                 void Registration::abort()
00286                 {
00287                         _queue.abort();
00288                         _notify_queue.abort();
00289 
00290                         ibrcommon::MutexLock l(_wait_for_cond);
00291                         _wait_for_cond.abort();
00292                 }
00293 
00294                 const std::string& Registration::getHandle() const
00295                 {
00296                         return _handle;
00297                 }
00298         }
00299 }