• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/ApiServer.cpp

Go to the documentation of this file.
00001 /*
00002  * ApiServer.cpp
00003  *
00004  *  Created on: 24.06.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ApiServer.h"
00009 #include "core/BundleCore.h"
00010 #include "core/EventSwitch.h"
00011 #include "routing/QueueBundleEvent.h"
00012 #include "ClientHandler.h"
00013 #include <ibrcommon/Logger.h>
00014 #include <typeinfo>
00015 #include <algorithm>
00016 
00017 using namespace dtn::data;
00018 using namespace dtn::core;
00019 using namespace dtn::streams;
00020 using namespace std;
00021 
00022 namespace dtn
00023 {
00024         namespace daemon
00025         {
00026                 ApiServer::ApiServer(const ibrcommon::File &socket)
00027                  : _tcpsrv(socket), _dist(), _next_connection_id(1)
00028                 {
00029                 }
00030 
00031                 ApiServer::ApiServer(ibrcommon::NetInterface net, int port)
00032                  : _tcpsrv(net, port), _dist()
00033                 {
00034                 }
00035 
00036                 ApiServer::~ApiServer()
00037                 {
00038                         join();
00039                 }
00040 
00041                 bool ApiServer::__cancellation()
00042                 {
00043                         shutdown();
00044                         return true;
00045                 }
00046 
00047                 void ApiServer::componentUp()
00048                 {
00049                         try {
00050                                 _dist.start();
00051                         } catch (const ibrcommon::ThreadException &ex) {
00052                                 IBRCOMMON_LOGGER(error) << "failed to start ApiServer\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00053                         }
00054                 }
00055 
00056                 void ApiServer::componentRun()
00057                 {
00058                         try {
00059                                 while (true)
00060                                 {
00061                                         ClientHandler *obj = new ClientHandler(*this, _tcpsrv.accept(), _next_connection_id);
00062                                         _next_connection_id++;
00063 
00064                                         // initialize the object
00065                                         obj->initialize();
00066 
00067                                         // breakpoint
00068                                         ibrcommon::Thread::yield();
00069                                 }
00070                         } catch (std::exception) {
00071                                 // ignore all errors
00072                                 return;
00073                         }
00074                 }
00075 
00076                 void ApiServer::componentDown()
00077                 {
00078                         _dist.shutdown();
00079                         shutdown();
00080                 }
00081 
00082                 void ApiServer::shutdown()
00083                 {
00084                         _dist.stop();
00085                         _tcpsrv.shutdown();
00086                         _tcpsrv.close();
00087                 }
00088 
00089                 void ApiServer::connectionUp(ClientHandler *obj)
00090                 {
00091                         _dist.add(obj);
00092                         IBRCOMMON_LOGGER_DEBUG(5) << "client connection up (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL;
00093                         IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL;
00094                 }
00095 
00096                 void ApiServer::connectionDown(ClientHandler *obj)
00097                 {
00098                         _dist.remove(obj);
00099                         IBRCOMMON_LOGGER_DEBUG(5) << "client connection down (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL;
00100                         IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL;
00101                 }
00102 
00103                 ApiServer::Distributor::Distributor()
00104                 {
00105                         bindEvent(dtn::routing::QueueBundleEvent::className);
00106                 }
00107 
00108                 ApiServer::Distributor::~Distributor()
00109                 {
00110                         unbindEvent(dtn::routing::QueueBundleEvent::className);
00111                         join();
00112                 }
00113 
00114                 void ApiServer::Distributor::add(ClientHandler *obj)
00115                 {
00116                         {
00117                                 ibrcommon::MutexLock l(_lock);
00118                                 _connections.push_back(obj);
00119                         }
00120                         _tasks.push(new QueryBundleTask(obj->id));
00121                 }
00122 
00123                 void ApiServer::Distributor::remove(ClientHandler *obj)
00124                 {
00125                         ibrcommon::MutexLock l(_lock);
00126                         _connections.erase( std::remove( _connections.begin(), _connections.end(), obj), _connections.end() );
00127                 }
00128 
00129                 bool ApiServer::Distributor::__cancellation()
00130                 {
00131                         // cancel the main thread in here
00132                         _tasks.abort();
00133 
00134                         // return true, to signal that no further cancel (the hardway) is needed
00135                         return true;
00136                 }
00137 
00141                 void ApiServer::Distributor::run()
00142                 {
00143                         try
00144                         {
00145                                 while (true)
00146                                 {
00147                                         // get the next task
00148                                         ApiServer::Task *task = _tasks.getnpop(true);
00149 
00150                                         try {
00151                                                 try {
00152                                                         QueryBundleTask &query = dynamic_cast<QueryBundleTask&>(*task);
00153 
00154                                                         IBRCOMMON_LOGGER_DEBUG(60) << "QueryBundleTask: " << query.id << IBRCOMMON_LOGGER_ENDL;
00155 
00156                                                         // get the global storage
00157                                                         BundleStorage &storage = BundleCore::getInstance().getStorage();
00158 
00159                                                         // search for all receiver of this bundle
00160                                                         ibrcommon::MutexLock l(_lock);
00161 
00162                                                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00163                                                         {
00164                                                                 ClientHandler *handler = (*iter);
00165 
00166                                                                 if (handler->id == query.id)
00167                                                                 {
00168                                                                         const dtn::data::Bundle b = storage.get( handler->getPeer() );
00169 
00170                                                                         // push the bundle to the client
00171                                                                         handler->queue(b);
00172 
00173                                                                         // generate a task for this receiver
00174                                                                         _tasks.push(new QueryBundleTask(query.id));
00175 
00176                                                                         // generate a delete bundle task
00177                                                                         _tasks.push(new RemoveBundleTask(b));
00178                                                                 }
00179                                                         }
00180                                                 } catch (const std::bad_cast&) {};
00181 
00182                                                 try {
00183                                                         ProcessBundleTask &pbt = dynamic_cast<ProcessBundleTask&>(*task);
00184 
00185                                                         IBRCOMMON_LOGGER_DEBUG(60) << "ProcessBundleTask: " << pbt.bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00186 
00187                                                         // search for all receiver of this bundle
00188                                                         ibrcommon::MutexLock l(_lock);
00189                                                         bool deleteIt = false;
00190 
00191                                                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00192                                                         {
00193                                                                 ClientHandler *handler = (*iter);
00194 
00195                                                                 if (handler->getPeer() == pbt.bundle.destination)
00196                                                                 {
00197                                                                         // generate a task for each receiver
00198                                                                         _tasks.push(new TransferBundleTask(pbt.bundle, handler->id));
00199 
00200                                                                         deleteIt = true;
00201                                                                 }
00202                                                         }
00203 
00204                                                         // generate a delete bundle task
00205                                                         if (deleteIt) _tasks.push(new RemoveBundleTask(pbt.bundle));
00206 
00207                                                 } catch (const std::bad_cast&) { };
00208 
00209                                                 try {
00210                                                         TransferBundleTask &transfer = dynamic_cast<TransferBundleTask&>(*task);
00211 
00212                                                         IBRCOMMON_LOGGER_DEBUG(60) << "TransferBundleTask: " << transfer.bundle.toString() << ", id: " << transfer.id << IBRCOMMON_LOGGER_ENDL;
00213 
00214                                                         // get the global storage
00215                                                         BundleStorage &storage = BundleCore::getInstance().getStorage();
00216 
00217                                                         // search for all receiver of this bundle
00218                                                         ibrcommon::MutexLock l(_lock);
00219 
00220                                                         for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00221                                                         {
00222                                                                 ClientHandler *handler = (*iter);
00223 
00224                                                                 try {
00225                                                                         if (handler->id == transfer.id)
00226                                                                         {
00227                                                                                 const dtn::data::Bundle b = storage.get( transfer.bundle );
00228 
00229                                                                                 // push the bundle to the client
00230                                                                                 handler->queue(b);
00231                                                                         }
00232                                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {};
00233                                                         }
00234 
00235                                                 } catch (const std::bad_cast&) { };
00236 
00237                                                 try {
00238                                                         RemoveBundleTask &r = dynamic_cast<RemoveBundleTask&>(*task);
00239 
00240                                                         IBRCOMMON_LOGGER_DEBUG(60) << "RemoveBundleTask: " << r.bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00241 
00242                                                         // get the global storage
00243                                                         BundleStorage &storage = BundleCore::getInstance().getStorage();
00244 
00245                                                         storage.remove(r.bundle);
00246 
00247                                                 } catch (const std::bad_cast&) {};
00248                                         } catch (const std::exception&) {};
00249 
00250                                         delete task;
00251 
00252                                         yield();
00253                                 }
00254                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00255                                 IBRCOMMON_LOGGER_DEBUG(10) << "ApiServer::Distributor going down" << IBRCOMMON_LOGGER_ENDL;
00256                         } catch (std::exception) {
00257                                 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected error or shutdown" << IBRCOMMON_LOGGER_ENDL;
00258                         }
00259                 }
00260 
00264                 void ApiServer::Distributor::raiseEvent(const Event *evt)
00265                 {
00266                         try {
00267                                 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00268                                 _tasks.push(new ApiServer::ProcessBundleTask(queued.bundle));
00269                         } catch (std::bad_cast ex) {
00270 
00271                         }
00272                 }
00273 
00274                 const std::string ApiServer::getName() const
00275                 {
00276                         return "ApiServer";
00277                 }
00278 
00279                 void ApiServer::Distributor::shutdown()
00280                 {
00281                         while (true)
00282                         {
00283                                 ClientHandler *client = NULL;
00284                                 {
00285                                         ibrcommon::MutexLock l(_lock);
00286                                         if (_connections.empty()) break;
00287                                         client = _connections.front();
00288                                         _connections.remove(client);
00289                                 }
00290 
00291                                 client->shutdown();
00292                         }
00293                 }
00294 
00295                 ApiServer::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &b) : bundle(b) {}
00296                 ApiServer::ProcessBundleTask::~ProcessBundleTask(){}
00297 
00298                 ApiServer::TransferBundleTask::TransferBundleTask(const dtn::data::BundleID &b, const size_t i) : bundle(b), id(i) {}
00299                 ApiServer::TransferBundleTask::~TransferBundleTask() {}
00300 
00301                 ApiServer::RemoveBundleTask::RemoveBundleTask(const dtn::data::BundleID &b) : bundle(b) {}
00302                 ApiServer::RemoveBundleTask::~RemoveBundleTask() {}
00303 
00304                 ApiServer::QueryBundleTask::QueryBundleTask(const size_t i) : id(i) {}
00305                 ApiServer::QueryBundleTask::~QueryBundleTask() {}
00306         }
00307 }

Generated on Thu Nov 11 2010 09:49:46 for IBR-DTNSuite by  doxygen 1.7.1