IBR-DTNSuite 0.6

daemon/src/api/ApiServer.cpp

Go to the documentation of this file.
00001 /*
00002  * ApiServer.cpp
00003  *
00004  *  Created on: 24.06.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "api/ApiServer.h"
00010 #include "api/ClientHandler.h"
00011 #include "core/BundleCore.h"
00012 #include "core/EventSwitch.h"
00013 #include "routing/QueueBundleEvent.h"
00014 #include <ibrcommon/Logger.h>
00015 #include <typeinfo>
00016 #include <algorithm>
00017 
00018 #ifdef HAVE_SQLITE
00019 #include "core/SQLiteBundleStorage.h"
00020 #endif
00021 
00022 namespace dtn
00023 {
00024         namespace api
00025         {
00026                 ApiServer::ApiServer(const ibrcommon::File &socket)
00027                  : _tcpsrv(socket), _next_connection_id(1)
00028                 {
00029                 }
00030 
00031                 ApiServer::ApiServer(const ibrcommon::vinterface &net, int port)
00032                 {
00033                         _tcpsrv.bind(net, port);
00034                 }
00035 
00036                 ApiServer::~ApiServer()
00037                 {
00038                         join();
00039                 }
00040 
00041                 bool ApiServer::__cancellation()
00042                 {
00043                         shutdown();
00044                         return true;
00045                 }
00046 
00047                 void ApiServer::componentUp()
00048                 {
00049                         _tcpsrv.listen(5);
00050 
00051                         try {
00052                                 _dist.start();
00053                         } catch (const ibrcommon::ThreadException &ex) {
00054                                 IBRCOMMON_LOGGER(error) << "failed to start ApiServer\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00055                         }
00056                 }
00057 
00058                 void ApiServer::componentRun()
00059                 {
00060                         try {
00061                                 while (true)
00062                                 {
00063                                         ClientHandler *obj = new ClientHandler(*this, _tcpsrv.accept(), _next_connection_id);
00064                                         _next_connection_id++;
00065 
00066                                         // initialize the object
00067                                         obj->initialize();
00068 
00069                                         // breakpoint
00070                                         ibrcommon::Thread::yield();
00071                                 }
00072                         } catch (const std::exception&) {
00073                                 // ignore all errors
00074                                 return;
00075                         }
00076                 }
00077 
00078                 void ApiServer::componentDown()
00079                 {
00080                         _dist.shutdown();
00081                         shutdown();
00082                 }
00083 
00084                 void ApiServer::shutdown()
00085                 {
00086                         _dist.stop();
00087                         _tcpsrv.shutdown();
00088                         _tcpsrv.close();
00089                 }
00090 
00091                 void ApiServer::connectionUp(ClientHandler *obj)
00092                 {
00093                         _dist.add(obj);
00094                         IBRCOMMON_LOGGER_DEBUG(5) << "client connection up (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL;
00095                         IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL;
00096                 }
00097 
00098                 void ApiServer::connectionDown(ClientHandler *obj)
00099                 {
00100                         _dist.remove(obj);
00101                         IBRCOMMON_LOGGER_DEBUG(5) << "client connection down (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL;
00102                         IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL;
00103                 }
00104 
00105                 ApiServer::Distributor::Distributor()
00106                 {
00107                         bindEvent(dtn::routing::QueueBundleEvent::className);
00108                 }
00109 
00110                 ApiServer::Distributor::~Distributor()
00111                 {
00112                         unbindEvent(dtn::routing::QueueBundleEvent::className);
00113                         join();
00114                 }
00115 
00116                 void ApiServer::Distributor::add(ClientHandler *obj)
00117                 {
00118                         {
00119                                 ibrcommon::MutexLock l(_connections_cond);
00120                                 _connections.push_back(obj);
00121                                 _connections_cond.signal(true);
00122                         }
00123                         _tasks.push(new QueryBundleTask(obj->id));
00124                 }
00125 
00126                 void ApiServer::Distributor::remove(ClientHandler *obj)
00127                 {
00128                         ibrcommon::MutexLock l(_connections_cond);
00129                         _connections.erase( std::remove( _connections.begin(), _connections.end(), obj), _connections.end() );
00130                         _connections_cond.signal(true);
00131                 }
00132 
00133                 bool ApiServer::Distributor::__cancellation()
00134                 {
00135                         // cancel the main thread in here
00136                         _tasks.abort();
00137 
00138                         // return true, to signal that no further cancel (the hardway) is needed
00139                         return true;
00140                 }
00141 
00145                 void ApiServer::Distributor::run()
00146                 {
00147 #ifdef HAVE_SQLITE
00148                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery
00149 #else
00150                         class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00151 #endif
00152                         {
00153                         public:
00154                                 BundleFilter(const dtn::data::EID &destination)
00155                                  : _destination(destination)
00156                                 {};
00157 
00158                                 virtual ~BundleFilter() {};
00159 
00160                                 virtual size_t limit() const { return 5; };
00161 
00162                                 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00163                                 {
00164                                         if (_destination != meta.destination)
00165                                         {
00166                                                 return false;
00167                                         }
00168 
00169                                         return true;
00170                                 };
00171 
00172 #ifdef HAVE_SQLITE
00173                                 const std::string getWhere() const
00174                                 {
00175                                         return "destination = ?";
00176                                 };
00177 
00178                                 size_t bind(sqlite3_stmt *st, size_t offset) const
00179                                 {
00180                                         sqlite3_bind_text(st, offset, _destination.getString().c_str(), _destination.getString().size(), SQLITE_TRANSIENT);
00181                                         return offset + 1;
00182                                 }
00183 #endif
00184 
00185                         private:
00186                                 const dtn::data::EID &_destination;
00187                         };
00188 
00189                         try
00190                         {
00191                                 while (true)
00192                                 {
00193                                         // get the next task
00194                                         ApiServer::Task *task = _tasks.getnpop(true);
00195 
00196                                         try {
00197                                                 try {
00198                                                         QueryBundleTask &query = dynamic_cast<QueryBundleTask&>(*task);
00199 
00200                                                         IBRCOMMON_LOGGER_DEBUG(60) << "QueryBundleTask: " << query.id << IBRCOMMON_LOGGER_ENDL;
00201 
00202                                                         // get the global storage
00203                                                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00204 
00205                                                         // lock the connection list
00206                                                         ibrcommon::MutexLock l(_connections_cond);
00207 
00208                                                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00209                                                         {
00210                                                                 ClientHandler *handler = (*iter);
00211 
00212                                                                 if (handler->id == query.id)
00213                                                                 {
00214                                                                         BundleFilter filter(handler->getPeer());
00215                                                                         const std::list<dtn::data::MetaBundle> list = storage.get( filter );
00216 
00217                                                                         for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00218                                                                         {
00219                                                                                 try {
00220                                                                                         const dtn::data::Bundle b = storage.get(*iter);
00221 
00222                                                                                         // push the bundle to the client
00223                                                                                         handler->queue(b);
00224 
00225                                                                                         // generate a delete bundle task
00226                                                                                         _tasks.push(new RemoveBundleTask(b));
00227                                                                                 }
00228                                                                                 catch (const dtn::core::BundleStorage::BundleLoadException&)
00229                                                                                 {
00230                                                                                 }
00231                                                                                 catch (const dtn::core::BundleStorage::NoBundleFoundException&)
00232                                                                                 {
00233                                                                                         break;
00234                                                                                 }
00235                                                                         }
00236 
00237                                                                         if (list.size() > 0)
00238                                                                         {
00239                                                                                 // generate a task for this receiver
00240                                                                                 _tasks.push(new QueryBundleTask(query.id));
00241                                                                         }
00242                                                                 }
00243                                                         }
00244                                                 } catch (const std::bad_cast&) {};
00245 
00246                                                 try {
00247                                                         ProcessBundleTask &pbt = dynamic_cast<ProcessBundleTask&>(*task);
00248 
00249                                                         IBRCOMMON_LOGGER_DEBUG(60) << "ProcessBundleTask: " << pbt.bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00250 
00251                                                         // search for all receiver of this bundle
00252                                                         ibrcommon::MutexLock l(_connections_cond);
00253                                                         bool deleteIt = false;
00254 
00255                                                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00256                                                         {
00257                                                                 ClientHandler *handler = (*iter);
00258 
00259                                                                 if (handler->getPeer() == pbt.bundle.destination)
00260                                                                 {
00261                                                                         // generate a task for each receiver
00262                                                                         _tasks.push(new TransferBundleTask(pbt.bundle, handler->id));
00263 
00264                                                                         deleteIt = true;
00265                                                                 }
00266                                                         }
00267 
00268                                                         // generate a delete bundle task
00269                                                         if (deleteIt) _tasks.push(new RemoveBundleTask(pbt.bundle));
00270 
00271                                                 } catch (const std::bad_cast&) { };
00272 
00273                                                 try {
00274                                                         TransferBundleTask &transfer = dynamic_cast<TransferBundleTask&>(*task);
00275 
00276                                                         IBRCOMMON_LOGGER_DEBUG(60) << "TransferBundleTask: " << transfer.bundle.toString() << ", id: " << transfer.id << IBRCOMMON_LOGGER_ENDL;
00277 
00278                                                         // get the global storage
00279                                                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00280 
00281                                                         // search for all receiver of this bundle
00282                                                         ibrcommon::MutexLock l(_connections_cond);
00283 
00284                                                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00285                                                         {
00286                                                                 ClientHandler *handler = (*iter);
00287 
00288                                                                 try {
00289                                                                         if (handler->id == transfer.id)
00290                                                                         {
00291                                                                                 const dtn::data::Bundle b = storage.get( transfer.bundle );
00292 
00293                                                                                 // push the bundle to the client
00294                                                                                 handler->queue(b);
00295                                                                         }
00296                                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {};
00297                                                         }
00298 
00299                                                 } catch (const std::bad_cast&) { };
00300 
00301                                                 try {
00302                                                         RemoveBundleTask &r = dynamic_cast<RemoveBundleTask&>(*task);
00303 
00304                                                         IBRCOMMON_LOGGER_DEBUG(60) << "RemoveBundleTask: " << r.bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00305 
00306                                                         // get the global storage
00307                                                         dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00308 
00309                                                         storage.remove(r.bundle);
00310 
00311                                                 } catch (const std::bad_cast&) {};
00312                                         } catch (const std::exception&) {};
00313 
00314                                         delete task;
00315 
00316                                         yield();
00317                                 }
00318                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00319                                 IBRCOMMON_LOGGER_DEBUG(10) << "ApiServer::Distributor going down" << IBRCOMMON_LOGGER_ENDL;
00320                         } catch (const std::exception&) {
00321                                 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected error or shutdown" << IBRCOMMON_LOGGER_ENDL;
00322                         }
00323                 }
00324 
00328                 void ApiServer::Distributor::raiseEvent(const dtn::core::Event *evt)
00329                 {
00330                         try {
00331                                 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00332                                 _tasks.push(new ApiServer::ProcessBundleTask(queued.bundle));
00333                         } catch (const std::bad_cast&) {
00334 
00335                         }
00336                 }
00337 
00338                 const std::string ApiServer::getName() const
00339                 {
00340                         return "ApiServer";
00341                 }
00342 
00343                 void ApiServer::Distributor::shutdown()
00344                 {
00345                         closeAll();
00346 
00347                         // wait until all client connections are down
00348                         ibrcommon::MutexLock l(_connections_cond);
00349                         while (_connections.size() > 0) _connections_cond.wait();
00350                 }
00351 
00352                 void ApiServer::Distributor::closeAll()
00353                 {
00354                         // search for an existing connection
00355                         ibrcommon::MutexLock l(_connections_cond);
00356                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00357                         {
00358                                 ClientHandler &conn = *(*iter);
00359 
00360                                 // close the connection immediately
00361                                 conn.shutdown();
00362                         }
00363                 }
00364 
00365                 ApiServer::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &b) : bundle(b) {}
00366                 ApiServer::ProcessBundleTask::~ProcessBundleTask(){}
00367 
00368                 ApiServer::TransferBundleTask::TransferBundleTask(const dtn::data::BundleID &b, const size_t i) : bundle(b), id(i) {}
00369                 ApiServer::TransferBundleTask::~TransferBundleTask() {}
00370 
00371                 ApiServer::RemoveBundleTask::RemoveBundleTask(const dtn::data::BundleID &b) : bundle(b) {}
00372                 ApiServer::RemoveBundleTask::~RemoveBundleTask() {}
00373 
00374                 ApiServer::QueryBundleTask::QueryBundleTask(const size_t i) : id(i) {}
00375                 ApiServer::QueryBundleTask::~QueryBundleTask() {}
00376         }
00377 }