• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/ApiServer.cpp

Go to the documentation of this file.
00001 /*
00002  * ApiServer.cpp
00003  *
00004  *  Created on: 24.06.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "ApiServer.h"
00010 #include "core/BundleCore.h"
00011 #include "core/EventSwitch.h"
00012 #include "routing/QueueBundleEvent.h"
00013 #include "ClientHandler.h"
00014 #include <ibrcommon/Logger.h>
00015 #include <typeinfo>
00016 #include <algorithm>
00017 
00018 #ifdef HAVE_SQLITE
00019 #include "core/SQLiteBundleStorage.h"
00020 #endif
00021 
00022 namespace dtn
00023 {
00024         namespace daemon
00025         {
00026                 ApiServer::ApiServer(const ibrcommon::File &socket)
00027                  : _tcpsrv(socket), _next_connection_id(1)
00028                 {
00029                 }
00030 
00031                 ApiServer::ApiServer(const ibrcommon::vinterface &net, int port)
00032                 {
00033                         _tcpsrv.bind(net, port);
00034                 }
00035 
00036                 ApiServer::~ApiServer()
00037                 {
00038                         join();
00039                 }
00040 
00041                 bool ApiServer::__cancellation()
00042                 {
00043                         shutdown();
00044                         return true;
00045                 }
00046 
00047                 void ApiServer::componentUp()
00048                 {
00049                         _tcpsrv.listen(5);
00050 
00051                         try {
00052                                 _dist.start();
00053                         } catch (const ibrcommon::ThreadException &ex) {
00054                                 IBRCOMMON_LOGGER(error) << "failed to start ApiServer\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00055                         }
00056                 }
00057 
00058                 void ApiServer::componentRun()
00059                 {
00060                         // enable interruption in this thread
00061                         Thread::enable_interruption();
00062 
00063                         try {
00064                                 while (true)
00065                                 {
00066                                         ClientHandler *obj = new ClientHandler(*this, _tcpsrv.accept(), _next_connection_id);
00067                                         _next_connection_id++;
00068 
00069                                         // initialize the object
00070                                         obj->initialize();
00071 
00072                                         // breakpoint
00073                                         ibrcommon::Thread::yield();
00074                                 }
00075                         } catch (const std::exception&) {
00076                                 // ignore all errors
00077                                 return;
00078                         }
00079                 }
00080 
00081                 void ApiServer::componentDown()
00082                 {
00083                         _dist.shutdown();
00084                         shutdown();
00085                 }
00086 
00087                 void ApiServer::shutdown()
00088                 {
00089                         _dist.stop();
00090                         _tcpsrv.shutdown();
00091                         _tcpsrv.close();
00092                         Thread::interrupt();
00093                 }
00094 
00095                 void ApiServer::connectionUp(ClientHandler *obj)
00096                 {
00097                         _dist.add(obj);
00098                         IBRCOMMON_LOGGER_DEBUG(5) << "client connection up (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL;
00099                         IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL;
00100                 }
00101 
00102                 void ApiServer::connectionDown(ClientHandler *obj)
00103                 {
00104                         _dist.remove(obj);
00105                         IBRCOMMON_LOGGER_DEBUG(5) << "client connection down (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL;
00106                         IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL;
00107                 }
00108 
00109                 ApiServer::Distributor::Distributor()
00110                 {
00111                         bindEvent(dtn::routing::QueueBundleEvent::className);
00112                 }
00113 
00114                 ApiServer::Distributor::~Distributor()
00115                 {
00116                         unbindEvent(dtn::routing::QueueBundleEvent::className);
00117                         join();
00118                 }
00119 
00120                 void ApiServer::Distributor::add(ClientHandler *obj)
00121                 {
00122                         {
00123                                 ibrcommon::MutexLock l(_connections_cond);
00124                                 _connections.push_back(obj);
00125                                 _connections_cond.signal(true);
00126                         }
00127                         _tasks.push(new QueryBundleTask(obj->id));
00128                 }
00129 
00130                 void ApiServer::Distributor::remove(ClientHandler *obj)
00131                 {
00132                         ibrcommon::MutexLock l(_connections_cond);
00133                         _connections.erase( std::remove( _connections.begin(), _connections.end(), obj), _connections.end() );
00134                         _connections_cond.signal(true);
00135                 }
00136 
00137                 bool ApiServer::Distributor::__cancellation()
00138                 {
00139                         // cancel the main thread in here
00140                         _tasks.abort();
00141 
00142                         // return true, to signal that no further cancel (the hardway) is needed
00143                         return true;
00144                 }
00145 
00149                 void ApiServer::Distributor::run()
00150                 {
00151 #ifdef HAVE_SQLITE
00152                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery
00153 #else
00154                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00155 #endif
00156                         {
00157                         public:
00158                                 BundleFilter(const dtn::data::EID &destination)
00159                                  : _destination(destination)
00160                                 {};
00161 
00162                                 virtual ~BundleFilter() {};
00163 
00164                                 virtual size_t limit() const { return 5; };
00165 
00166                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00167                                 {
00168                                         if (_destination != meta.destination)
00169                                         {
00170                                                 return false;
00171                                         }
00172 
00173                                         return true;
00174                                 };
00175 
00176 #ifdef HAVE_SQLITE
00177                                 const std::string getWhere() const
00178                                 {
00179                                         return "destination = ?";
00180                                 };
00181 
00182                                 size_t bind(sqlite3_stmt *st, size_t offset) const
00183                                 {
00184                                         sqlite3_bind_text(st, offset, _destination.getString().c_str(), _destination.getString().size(), SQLITE_TRANSIENT);
00185                                         return offset + 1;
00186                                 }
00187 #endif
00188 
00189                         private:
00190                                 const dtn::data::EID &_destination;
00191                         };
00192 
00193                         try
00194                         {
00195                                 while (true)
00196                                 {
00197                                         // get the next task
00198                                         ApiServer::Task *task = _tasks.getnpop(true);
00199 
00200                                         try {
00201                                                 try {
00202                                                         QueryBundleTask &query = dynamic_cast<QueryBundleTask&>(*task);
00203 
00204                                                         IBRCOMMON_LOGGER_DEBUG(60) << "QueryBundleTask: " << query.id << IBRCOMMON_LOGGER_ENDL;
00205 
00206                                                         // get the global storage
00207                                                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00208 
00209                                                         // lock the connection list
00210                                                         ibrcommon::MutexLock l(_connections_cond);
00211 
00212                                                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00213                                                         {
00214                                                                 ClientHandler *handler = (*iter);
00215 
00216                                                                 if (handler->id == query.id)
00217                                                                 {
00218                                                                         BundleFilter filter(handler->getPeer());
00219                                                                         const std::list<dtn::data::MetaBundle> list = storage.get( filter );
00220 
00221                                                                         for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00222                                                                         {
00223                                                                                 try {
00224                                                                                         const dtn::data::Bundle b = storage.get(*iter);
00225 
00226                                                                                         // push the bundle to the client
00227                                                                                         handler->queue(b);
00228 
00229                                                                                         // generate a delete bundle task
00230                                                                                         _tasks.push(new RemoveBundleTask(b));
00231                                                                                 }
00232                                                                                 catch (const dtn::core::BundleStorage::BundleLoadException&)
00233                                                                                 {
00234                                                                                 }
00235                                                                                 catch (const dtn::core::BundleStorage::NoBundleFoundException&)
00236                                                                                 {
00237                                                                                         break;
00238                                                                                 }
00239                                                                         }
00240 
00241                                                                         if (list.size() > 0)
00242                                                                         {
00243                                                                                 // generate a task for this receiver
00244                                                                                 _tasks.push(new QueryBundleTask(query.id));
00245                                                                         }
00246                                                                 }
00247                                                         }
00248                                                 } catch (const std::bad_cast&) {};
00249 
00250                                                 try {
00251                                                         ProcessBundleTask &pbt = dynamic_cast<ProcessBundleTask&>(*task);
00252 
00253                                                         IBRCOMMON_LOGGER_DEBUG(60) << "ProcessBundleTask: " << pbt.bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00254 
00255                                                         // search for all receiver of this bundle
00256                                                         ibrcommon::MutexLock l(_connections_cond);
00257                                                         bool deleteIt = false;
00258 
00259                                                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00260                                                         {
00261                                                                 ClientHandler *handler = (*iter);
00262 
00263                                                                 if (handler->getPeer() == pbt.bundle.destination)
00264                                                                 {
00265                                                                         // generate a task for each receiver
00266                                                                         _tasks.push(new TransferBundleTask(pbt.bundle, handler->id));
00267 
00268                                                                         deleteIt = true;
00269                                                                 }
00270                                                         }
00271 
00272                                                         // generate a delete bundle task
00273                                                         if (deleteIt) _tasks.push(new RemoveBundleTask(pbt.bundle));
00274 
00275                                                 } catch (const std::bad_cast&) { };
00276 
00277                                                 try {
00278                                                         TransferBundleTask &transfer = dynamic_cast<TransferBundleTask&>(*task);
00279 
00280                                                         IBRCOMMON_LOGGER_DEBUG(60) << "TransferBundleTask: " << transfer.bundle.toString() << ", id: " << transfer.id << IBRCOMMON_LOGGER_ENDL;
00281 
00282                                                         // get the global storage
00283                                                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00284 
00285                                                         // search for all receiver of this bundle
00286                                                         ibrcommon::MutexLock l(_connections_cond);
00287 
00288                                                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00289                                                         {
00290                                                                 ClientHandler *handler = (*iter);
00291 
00292                                                                 try {
00293                                                                         if (handler->id == transfer.id)
00294                                                                         {
00295                                                                                 const dtn::data::Bundle b = storage.get( transfer.bundle );
00296 
00297                                                                                 // push the bundle to the client
00298                                                                                 handler->queue(b);
00299                                                                         }
00300                                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {};
00301                                                         }
00302 
00303                                                 } catch (const std::bad_cast&) { };
00304 
00305                                                 try {
00306                                                         RemoveBundleTask &r = dynamic_cast<RemoveBundleTask&>(*task);
00307 
00308                                                         IBRCOMMON_LOGGER_DEBUG(60) << "RemoveBundleTask: " << r.bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00309 
00310                                                         // get the global storage
00311                                                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00312 
00313                                                         storage.remove(r.bundle);
00314 
00315                                                 } catch (const std::bad_cast&) {};
00316                                         } catch (const std::exception&) {};
00317 
00318                                         delete task;
00319 
00320                                         yield();
00321                                 }
00322                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00323                                 IBRCOMMON_LOGGER_DEBUG(10) << "ApiServer::Distributor going down" << IBRCOMMON_LOGGER_ENDL;
00324                         } catch (const std::exception&) {
00325                                 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected error or shutdown" << IBRCOMMON_LOGGER_ENDL;
00326                         }
00327                 }
00328 
00332                 void ApiServer::Distributor::raiseEvent(const dtn::core::Event *evt)
00333                 {
00334                         try {
00335                                 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00336                                 _tasks.push(new ApiServer::ProcessBundleTask(queued.bundle));
00337                         } catch (const std::bad_cast&) {
00338 
00339                         }
00340                 }
00341 
00342                 const std::string ApiServer::getName() const
00343                 {
00344                         return "ApiServer";
00345                 }
00346 
00347                 void ApiServer::Distributor::shutdown()
00348                 {
00349                         closeAll();
00350 
00351                         // wait until all client connections are down
00352                         ibrcommon::MutexLock l(_connections_cond);
00353                         while (_connections.size() > 0) _connections_cond.wait();
00354                 }
00355 
00356                 void ApiServer::Distributor::closeAll()
00357                 {
00358                         // search for an existing connection
00359                         ibrcommon::MutexLock l(_connections_cond);
00360                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00361                         {
00362                                 ClientHandler &conn = *(*iter);
00363 
00364                                 // close the connection immediately
00365                                 conn.shutdown();
00366                         }
00367                 }
00368 
00369                 ApiServer::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &b) : bundle(b) {}
00370                 ApiServer::ProcessBundleTask::~ProcessBundleTask(){}
00371 
00372                 ApiServer::TransferBundleTask::TransferBundleTask(const dtn::data::BundleID &b, const size_t i) : bundle(b), id(i) {}
00373                 ApiServer::TransferBundleTask::~TransferBundleTask() {}
00374 
00375                 ApiServer::RemoveBundleTask::RemoveBundleTask(const dtn::data::BundleID &b) : bundle(b) {}
00376                 ApiServer::RemoveBundleTask::~RemoveBundleTask() {}
00377 
00378                 ApiServer::QueryBundleTask::QueryBundleTask(const size_t i) : id(i) {}
00379                 ApiServer::QueryBundleTask::~QueryBundleTask() {}
00380         }
00381 }

Generated on Wed Mar 30 2011 11:11:48 for IBR-DTNSuite by  doxygen 1.7.1