|
IBR-DTNSuite 0.6
|
00001 /* 00002 * ApiServer.cpp 00003 * 00004 * Created on: 24.06.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "api/ApiServer.h" 00010 #include "api/ClientHandler.h" 00011 #include "core/BundleCore.h" 00012 #include "core/EventSwitch.h" 00013 #include "routing/QueueBundleEvent.h" 00014 #include <ibrcommon/Logger.h> 00015 #include <typeinfo> 00016 #include <algorithm> 00017 00018 #ifdef HAVE_SQLITE 00019 #include "core/SQLiteBundleStorage.h" 00020 #endif 00021 00022 namespace dtn 00023 { 00024 namespace api 00025 { 00026 ApiServer::ApiServer(const ibrcommon::File &socket) 00027 : _tcpsrv(socket), _next_connection_id(1) 00028 { 00029 } 00030 00031 ApiServer::ApiServer(const ibrcommon::vinterface &net, int port) 00032 { 00033 _tcpsrv.bind(net, port); 00034 } 00035 00036 ApiServer::~ApiServer() 00037 { 00038 join(); 00039 } 00040 00041 bool ApiServer::__cancellation() 00042 { 00043 shutdown(); 00044 return true; 00045 } 00046 00047 void ApiServer::componentUp() 00048 { 00049 _tcpsrv.listen(5); 00050 00051 try { 00052 _dist.start(); 00053 } catch (const ibrcommon::ThreadException &ex) { 00054 IBRCOMMON_LOGGER(error) << "failed to start ApiServer\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00055 } 00056 } 00057 00058 void ApiServer::componentRun() 00059 { 00060 try { 00061 while (true) 00062 { 00063 ClientHandler *obj = new ClientHandler(*this, _tcpsrv.accept(), _next_connection_id); 00064 _next_connection_id++; 00065 00066 // initialize the object 00067 obj->initialize(); 00068 00069 // breakpoint 00070 ibrcommon::Thread::yield(); 00071 } 00072 } catch (const std::exception&) { 00073 // ignore all errors 00074 return; 00075 } 00076 } 00077 00078 void ApiServer::componentDown() 00079 { 00080 _dist.shutdown(); 00081 shutdown(); 00082 } 00083 00084 void ApiServer::shutdown() 00085 { 00086 _dist.stop(); 00087 _tcpsrv.shutdown(); 00088 _tcpsrv.close(); 00089 } 00090 00091 void ApiServer::connectionUp(ClientHandler *obj) 00092 { 00093 _dist.add(obj); 00094 IBRCOMMON_LOGGER_DEBUG(5) << "client connection up (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL; 00095 IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL; 00096 } 00097 00098 void ApiServer::connectionDown(ClientHandler *obj) 00099 { 00100 _dist.remove(obj); 00101 IBRCOMMON_LOGGER_DEBUG(5) << "client connection down (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL; 00102 IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL; 00103 } 00104 00105 ApiServer::Distributor::Distributor() 00106 { 00107 bindEvent(dtn::routing::QueueBundleEvent::className); 00108 } 00109 00110 ApiServer::Distributor::~Distributor() 00111 { 00112 unbindEvent(dtn::routing::QueueBundleEvent::className); 00113 join(); 00114 } 00115 00116 void ApiServer::Distributor::add(ClientHandler *obj) 00117 { 00118 { 00119 ibrcommon::MutexLock l(_connections_cond); 00120 _connections.push_back(obj); 00121 _connections_cond.signal(true); 00122 } 00123 _tasks.push(new QueryBundleTask(obj->id)); 00124 } 00125 00126 void ApiServer::Distributor::remove(ClientHandler *obj) 00127 { 00128 ibrcommon::MutexLock l(_connections_cond); 00129 _connections.erase( std::remove( _connections.begin(), _connections.end(), obj), _connections.end() ); 00130 _connections_cond.signal(true); 00131 } 00132 00133 bool ApiServer::Distributor::__cancellation() 00134 { 00135 // cancel the main thread in here 00136 _tasks.abort(); 00137 00138 // return true, to signal that no further cancel (the hardway) is needed 00139 return true; 00140 } 00141 00145 void ApiServer::Distributor::run() 00146 { 00147 #ifdef HAVE_SQLITE 00148 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback, public dtn::core::SQLiteBundleStorage::SQLBundleQuery 00149 #else 00150 class BundleFilter : public dtn::core::BundleStorage::BundleFilterCallback 00151 #endif 00152 { 00153 public: 00154 BundleFilter(const dtn::data::EID &destination) 00155 : _destination(destination) 00156 {}; 00157 00158 virtual ~BundleFilter() {}; 00159 00160 virtual size_t limit() const { return 5; }; 00161 00162 virtual bool shouldAdd(const dtn::data::MetaBundle &meta) const 00163 { 00164 if (_destination != meta.destination) 00165 { 00166 return false; 00167 } 00168 00169 return true; 00170 }; 00171 00172 #ifdef HAVE_SQLITE 00173 const std::string getWhere() const 00174 { 00175 return "destination = ?"; 00176 }; 00177 00178 size_t bind(sqlite3_stmt *st, size_t offset) const 00179 { 00180 sqlite3_bind_text(st, offset, _destination.getString().c_str(), _destination.getString().size(), SQLITE_TRANSIENT); 00181 return offset + 1; 00182 } 00183 #endif 00184 00185 private: 00186 const dtn::data::EID &_destination; 00187 }; 00188 00189 try 00190 { 00191 while (true) 00192 { 00193 // get the next task 00194 ApiServer::Task *task = _tasks.getnpop(true); 00195 00196 try { 00197 try { 00198 QueryBundleTask &query = dynamic_cast<QueryBundleTask&>(*task); 00199 00200 IBRCOMMON_LOGGER_DEBUG(60) << "QueryBundleTask: " << query.id << IBRCOMMON_LOGGER_ENDL; 00201 00202 // get the global storage 00203 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00204 00205 // lock the connection list 00206 ibrcommon::MutexLock l(_connections_cond); 00207 00208 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00209 { 00210 ClientHandler *handler = (*iter); 00211 00212 if (handler->id == query.id) 00213 { 00214 BundleFilter filter(handler->getPeer()); 00215 const std::list<dtn::data::MetaBundle> list = storage.get( filter ); 00216 00217 for (std::list<dtn::data::MetaBundle>::const_iterator iter = list.begin(); iter != list.end(); iter++) 00218 { 00219 try { 00220 const dtn::data::Bundle b = storage.get(*iter); 00221 00222 // push the bundle to the client 00223 handler->queue(b); 00224 00225 // generate a delete bundle task 00226 _tasks.push(new RemoveBundleTask(b)); 00227 } 00228 catch (const dtn::core::BundleStorage::BundleLoadException&) 00229 { 00230 } 00231 catch (const dtn::core::BundleStorage::NoBundleFoundException&) 00232 { 00233 break; 00234 } 00235 } 00236 00237 if (list.size() > 0) 00238 { 00239 // generate a task for this receiver 00240 _tasks.push(new QueryBundleTask(query.id)); 00241 } 00242 } 00243 } 00244 } catch (const std::bad_cast&) {}; 00245 00246 try { 00247 ProcessBundleTask &pbt = dynamic_cast<ProcessBundleTask&>(*task); 00248 00249 IBRCOMMON_LOGGER_DEBUG(60) << "ProcessBundleTask: " << pbt.bundle.toString() << IBRCOMMON_LOGGER_ENDL; 00250 00251 // search for all receiver of this bundle 00252 ibrcommon::MutexLock l(_connections_cond); 00253 bool deleteIt = false; 00254 00255 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00256 { 00257 ClientHandler *handler = (*iter); 00258 00259 if (handler->getPeer() == pbt.bundle.destination) 00260 { 00261 // generate a task for each receiver 00262 _tasks.push(new TransferBundleTask(pbt.bundle, handler->id)); 00263 00264 deleteIt = true; 00265 } 00266 } 00267 00268 // generate a delete bundle task 00269 if (deleteIt) _tasks.push(new RemoveBundleTask(pbt.bundle)); 00270 00271 } catch (const std::bad_cast&) { }; 00272 00273 try { 00274 TransferBundleTask &transfer = dynamic_cast<TransferBundleTask&>(*task); 00275 00276 IBRCOMMON_LOGGER_DEBUG(60) << "TransferBundleTask: " << transfer.bundle.toString() << ", id: " << transfer.id << IBRCOMMON_LOGGER_ENDL; 00277 00278 // get the global storage 00279 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00280 00281 // search for all receiver of this bundle 00282 ibrcommon::MutexLock l(_connections_cond); 00283 00284 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00285 { 00286 ClientHandler *handler = (*iter); 00287 00288 try { 00289 if (handler->id == transfer.id) 00290 { 00291 const dtn::data::Bundle b = storage.get( transfer.bundle ); 00292 00293 // push the bundle to the client 00294 handler->queue(b); 00295 } 00296 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {}; 00297 } 00298 00299 } catch (const std::bad_cast&) { }; 00300 00301 try { 00302 RemoveBundleTask &r = dynamic_cast<RemoveBundleTask&>(*task); 00303 00304 IBRCOMMON_LOGGER_DEBUG(60) << "RemoveBundleTask: " << r.bundle.toString() << IBRCOMMON_LOGGER_ENDL; 00305 00306 // get the global storage 00307 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage(); 00308 00309 storage.remove(r.bundle); 00310 00311 } catch (const std::bad_cast&) {}; 00312 } catch (const std::exception&) {}; 00313 00314 delete task; 00315 00316 yield(); 00317 } 00318 } catch (const ibrcommon::QueueUnblockedException &ex) { 00319 IBRCOMMON_LOGGER_DEBUG(10) << "ApiServer::Distributor going down" << IBRCOMMON_LOGGER_ENDL; 00320 } catch (const std::exception&) { 00321 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected error or shutdown" << IBRCOMMON_LOGGER_ENDL; 00322 } 00323 } 00324 00328 void ApiServer::Distributor::raiseEvent(const dtn::core::Event *evt) 00329 { 00330 try { 00331 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt); 00332 _tasks.push(new ApiServer::ProcessBundleTask(queued.bundle)); 00333 } catch (const std::bad_cast&) { 00334 00335 } 00336 } 00337 00338 const std::string ApiServer::getName() const 00339 { 00340 return "ApiServer"; 00341 } 00342 00343 void ApiServer::Distributor::shutdown() 00344 { 00345 closeAll(); 00346 00347 // wait until all client connections are down 00348 ibrcommon::MutexLock l(_connections_cond); 00349 while (_connections.size() > 0) _connections_cond.wait(); 00350 } 00351 00352 void ApiServer::Distributor::closeAll() 00353 { 00354 // search for an existing connection 00355 ibrcommon::MutexLock l(_connections_cond); 00356 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++) 00357 { 00358 ClientHandler &conn = *(*iter); 00359 00360 // close the connection immediately 00361 conn.shutdown(); 00362 } 00363 } 00364 00365 ApiServer::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &b) : bundle(b) {} 00366 ApiServer::ProcessBundleTask::~ProcessBundleTask(){} 00367 00368 ApiServer::TransferBundleTask::TransferBundleTask(const dtn::data::BundleID &b, const size_t i) : bundle(b), id(i) {} 00369 ApiServer::TransferBundleTask::~TransferBundleTask() {} 00370 00371 ApiServer::RemoveBundleTask::RemoveBundleTask(const dtn::data::BundleID &b) : bundle(b) {} 00372 ApiServer::RemoveBundleTask::~RemoveBundleTask() {} 00373 00374 ApiServer::QueryBundleTask::QueryBundleTask(const size_t i) : id(i) {} 00375 ApiServer::QueryBundleTask::~QueryBundleTask() {} 00376 } 00377 }