Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "config.h"
00009 #include "ApiServer.h"
00010 #include "core/BundleCore.h"
00011 #include "core/EventSwitch.h"
00012 #include "routing/QueueBundleEvent.h"
00013 #include "ClientHandler.h"
00014 #include <ibrcommon/Logger.h>
00015 #include <typeinfo>
00016 #include <algorithm>
00017
00018 #ifdef HAVE_SQLITE
00019 #include "core/SQLiteBundleStorage.h"
00020 #endif
00021
00022 namespace dtn
00023 {
00024 namespace daemon
00025 {
00026 ApiServer::ApiServer(const ibrcommon::File &socket)
00027 : _tcpsrv(socket), _next_connection_id(1)
00028 {
00029 }
00030
00031 ApiServer::ApiServer(const ibrcommon::vinterface &net, int port)
00032 {
00033 _tcpsrv.bind(net, port);
00034 }
00035
00036 ApiServer::~ApiServer()
00037 {
00038 join();
00039 }
00040
00041 bool ApiServer::__cancellation()
00042 {
00043 shutdown();
00044 return true;
00045 }
00046
00047 void ApiServer::componentUp()
00048 {
00049 _tcpsrv.listen(5);
00050
00051 try {
00052 _dist.start();
00053 } catch (const ibrcommon::ThreadException &ex) {
00054 IBRCOMMON_LOGGER(error) << "failed to start ApiServer\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00055 }
00056 }
00057
00058 void ApiServer::componentRun()
00059 {
00060
00061 Thread::enable_interruption();
00062
00063 try {
00064 while (true)
00065 {
00066 ClientHandler *obj = new ClientHandler(*this, _tcpsrv.accept(), _next_connection_id);
00067 _next_connection_id++;
00068
00069
00070 obj->initialize();
00071
00072
00073 ibrcommon::Thread::yield();
00074 }
00075 } catch (const std::exception&) {
00076
00077 return;
00078 }
00079 }
00080
00081 void ApiServer::componentDown()
00082 {
00083 _dist.shutdown();
00084 shutdown();
00085 }
00086
00087 void ApiServer::shutdown()
00088 {
00089 _dist.stop();
00090 _tcpsrv.shutdown();
00091 _tcpsrv.close();
00092 Thread::interrupt();
00093 }
00094
00095 void ApiServer::connectionUp(ClientHandler *obj)
00096 {
00097 _dist.add(obj);
00098 IBRCOMMON_LOGGER_DEBUG(5) << "client connection up (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL;
00099 IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL;
00100 }
00101
00102 void ApiServer::connectionDown(ClientHandler *obj)
00103 {
00104 _dist.remove(obj);
00105 IBRCOMMON_LOGGER_DEBUG(5) << "client connection down (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL;
00106 IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL;
00107 }
00108
00109 ApiServer::Distributor::Distributor()
00110 {
00111 bindEvent(dtn::routing::QueueBundleEvent::className);
00112 }
00113
00114 ApiServer::Distributor::~Distributor()
00115 {
00116 unbindEvent(dtn::routing::QueueBundleEvent::className);
00117 join();
00118 }
00119
00120 void ApiServer::Distributor::add(ClientHandler *obj)
00121 {
00122 {
00123 ibrcommon::MutexLock l(_connections_cond);
00124 _connections.push_back(obj);
00125 _connections_cond.signal(true);
00126 }
00127 _tasks.push(new QueryBundleTask(obj->id));
00128 }
00129
00130 void ApiServer::Distributor::remove(ClientHandler *obj)
00131 {
00132 ibrcommon::MutexLock l(_connections_cond);
00133 _connections.erase( std::remove( _connections.begin(), _connections.end(), obj), _connections.end() );
00134 _connections_cond.signal(true);
00135 }
00136
00137 bool ApiServer::Distributor::__cancellation()
00138 {
00139
00140 _tasks.abort();
00141
00142
00143 return true;
00144 }
00145
00149 void ApiServer::Distributor::run()
00150 {
00151 #ifdef HAVE_SQLITE
00152 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery
00153 #else
00154 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback
00155 #endif
00156 {
00157 public:
00158 BundleFilter(const dtn::data::EID &destination)
00159 : _destination(destination)
00160 {};
00161
00162 virtual ~BundleFilter() {};
00163
00164 virtual size_t limit() const { return 5; };
00165
00166 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const
00167 {
00168 if (_destination != meta.destination)
00169 {
00170 return false;
00171 }
00172
00173 return true;
00174 };
00175
00176 #ifdef HAVE_SQLITE
00177 const std::string getWhere() const
00178 {
00179 return "destination = ?";
00180 };
00181
00182 size_t bind(sqlite3_stmt *st, size_t offset) const
00183 {
00184 sqlite3_bind_text(st, offset, _destination.getString().c_str(), _destination.getString().size(), SQLITE_TRANSIENT);
00185 return offset + 1;
00186 }
00187 #endif
00188
00189 private:
00190 const dtn::data::EID &_destination;
00191 };
00192
00193 try
00194 {
00195 while (true)
00196 {
00197
00198 ApiServer::Task *task = _tasks.getnpop(true);
00199
00200 try {
00201 try {
00202 QueryBundleTask &query = dynamic_cast<QueryBundleTask&>(*task);
00203
00204 IBRCOMMON_LOGGER_DEBUG(60) << "QueryBundleTask: " << query.id << IBRCOMMON_LOGGER_ENDL;
00205
00206
00207 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00208
00209
00210 ibrcommon::MutexLock l(_connections_cond);
00211
00212 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00213 {
00214 ClientHandler *handler = (*iter);
00215
00216 if (handler->id == query.id)
00217 {
00218 BundleFilter filter(handler->getPeer());
00219 const std::list<dtn::data::MetaBundle> list = storage.get( filter );
00220
00221 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++)
00222 {
00223 try {
00224 const dtn::data::Bundle b = storage.get(*iter);
00225
00226
00227 handler->queue(b);
00228
00229
00230 _tasks.push(new RemoveBundleTask(b));
00231 }
00232 catch (const dtn::core::BundleStorage::BundleLoadException&)
00233 {
00234 }
00235 catch (const dtn::core::BundleStorage::NoBundleFoundException&)
00236 {
00237 break;
00238 }
00239 }
00240
00241 if (list.size() > 0)
00242 {
00243
00244 _tasks.push(new QueryBundleTask(query.id));
00245 }
00246 }
00247 }
00248 } catch (const std::bad_cast&) {};
00249
00250 try {
00251 ProcessBundleTask &pbt = dynamic_cast<ProcessBundleTask&>(*task);
00252
00253 IBRCOMMON_LOGGER_DEBUG(60) << "ProcessBundleTask: " << pbt.bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00254
00255
00256 ibrcommon::MutexLock l(_connections_cond);
00257 bool deleteIt = false;
00258
00259 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00260 {
00261 ClientHandler *handler = (*iter);
00262
00263 if (handler->getPeer() == pbt.bundle.destination)
00264 {
00265
00266 _tasks.push(new TransferBundleTask(pbt.bundle, handler->id));
00267
00268 deleteIt = true;
00269 }
00270 }
00271
00272
00273 if (deleteIt) _tasks.push(new RemoveBundleTask(pbt.bundle));
00274
00275 } catch (const std::bad_cast&) { };
00276
00277 try {
00278 TransferBundleTask &transfer = dynamic_cast<TransferBundleTask&>(*task);
00279
00280 IBRCOMMON_LOGGER_DEBUG(60) << "TransferBundleTask: " << transfer.bundle.toString() << ", id: " << transfer.id << IBRCOMMON_LOGGER_ENDL;
00281
00282
00283 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00284
00285
00286 ibrcommon::MutexLock l(_connections_cond);
00287
00288 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00289 {
00290 ClientHandler *handler = (*iter);
00291
00292 try {
00293 if (handler->id == transfer.id)
00294 {
00295 const dtn::data::Bundle b = storage.get( transfer.bundle );
00296
00297
00298 handler->queue(b);
00299 }
00300 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {};
00301 }
00302
00303 } catch (const std::bad_cast&) { };
00304
00305 try {
00306 RemoveBundleTask &r = dynamic_cast<RemoveBundleTask&>(*task);
00307
00308 IBRCOMMON_LOGGER_DEBUG(60) << "RemoveBundleTask: " << r.bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00309
00310
00311 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00312
00313 storage.remove(r.bundle);
00314
00315 } catch (const std::bad_cast&) {};
00316 } catch (const std::exception&) {};
00317
00318 delete task;
00319
00320 yield();
00321 }
00322 } catch (const ibrcommon::QueueUnblockedException &ex) {
00323 IBRCOMMON_LOGGER_DEBUG(10) << "ApiServer::Distributor going down" << IBRCOMMON_LOGGER_ENDL;
00324 } catch (const std::exception&) {
00325 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected error or shutdown" << IBRCOMMON_LOGGER_ENDL;
00326 }
00327 }
00328
00332 void ApiServer::Distributor::raiseEvent(const dtn::core::Event *evt)
00333 {
00334 try {
00335 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00336 _tasks.push(new ApiServer::ProcessBundleTask(queued.bundle));
00337 } catch (const std::bad_cast&) {
00338
00339 }
00340 }
00341
00342 const std::string ApiServer::getName() const
00343 {
00344 return "ApiServer";
00345 }
00346
00347 void ApiServer::Distributor::shutdown()
00348 {
00349 closeAll();
00350
00351
00352 ibrcommon::MutexLock l(_connections_cond);
00353 while (_connections.size() > 0) _connections_cond.wait();
00354 }
00355
00356 void ApiServer::Distributor::closeAll()
00357 {
00358
00359 ibrcommon::MutexLock l(_connections_cond);
00360 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00361 {
00362 ClientHandler &conn = *(*iter);
00363
00364
00365 conn.shutdown();
00366 }
00367 }
00368
00369 ApiServer::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &b) : bundle(b) {}
00370 ApiServer::ProcessBundleTask::~ProcessBundleTask(){}
00371
00372 ApiServer::TransferBundleTask::TransferBundleTask(const dtn::data::BundleID &b, const size_t i) : bundle(b), id(i) {}
00373 ApiServer::TransferBundleTask::~TransferBundleTask() {}
00374
00375 ApiServer::RemoveBundleTask::RemoveBundleTask(const dtn::data::BundleID &b) : bundle(b) {}
00376 ApiServer::RemoveBundleTask::~RemoveBundleTask() {}
00377
00378 ApiServer::QueryBundleTask::QueryBundleTask(const size_t i) : id(i) {}
00379 ApiServer::QueryBundleTask::~QueryBundleTask() {}
00380 }
00381 }