Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "ApiServer.h"
00009 #include "core/BundleCore.h"
00010 #include "core/EventSwitch.h"
00011 #include "routing/QueueBundleEvent.h"
00012 #include "ClientHandler.h"
00013 #include <ibrcommon/Logger.h>
00014 #include <typeinfo>
00015 #include <algorithm>
00016
00017 using namespace dtn::data;
00018 using namespace dtn::core;
00019 using namespace dtn::streams;
00020 using namespace std;
00021
00022 namespace dtn
00023 {
00024 namespace daemon
00025 {
00026 ApiServer::ApiServer(const ibrcommon::File &socket)
00027 : _tcpsrv(socket), _dist(), _next_connection_id(1)
00028 {
00029 }
00030
00031 ApiServer::ApiServer(ibrcommon::NetInterface net, int port)
00032 : _tcpsrv(net, port), _dist()
00033 {
00034 }
00035
00036 ApiServer::~ApiServer()
00037 {
00038 join();
00039 }
00040
00041 bool ApiServer::__cancellation()
00042 {
00043 shutdown();
00044 return true;
00045 }
00046
00047 void ApiServer::componentUp()
00048 {
00049 try {
00050 _dist.start();
00051 } catch (const ibrcommon::ThreadException &ex) {
00052 IBRCOMMON_LOGGER(error) << "failed to start ApiServer\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00053 }
00054 }
00055
00056 void ApiServer::componentRun()
00057 {
00058 try {
00059 while (true)
00060 {
00061 ClientHandler *obj = new ClientHandler(*this, _tcpsrv.accept(), _next_connection_id);
00062 _next_connection_id++;
00063
00064
00065 obj->initialize();
00066
00067
00068 ibrcommon::Thread::yield();
00069 }
00070 } catch (std::exception) {
00071
00072 return;
00073 }
00074 }
00075
00076 void ApiServer::componentDown()
00077 {
00078 _dist.shutdown();
00079 shutdown();
00080 }
00081
00082 void ApiServer::shutdown()
00083 {
00084 _dist.stop();
00085 _tcpsrv.shutdown();
00086 _tcpsrv.close();
00087 }
00088
00089 void ApiServer::connectionUp(ClientHandler *obj)
00090 {
00091 _dist.add(obj);
00092 IBRCOMMON_LOGGER_DEBUG(5) << "client connection up (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL;
00093 IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL;
00094 }
00095
00096 void ApiServer::connectionDown(ClientHandler *obj)
00097 {
00098 _dist.remove(obj);
00099 IBRCOMMON_LOGGER_DEBUG(5) << "client connection down (id: " << obj->id << ")" << IBRCOMMON_LOGGER_ENDL;
00100 IBRCOMMON_LOGGER_DEBUG(60) << "current open connections " << _dist._connections.size() << IBRCOMMON_LOGGER_ENDL;
00101 }
00102
00103 ApiServer::Distributor::Distributor()
00104 {
00105 bindEvent(dtn::routing::QueueBundleEvent::className);
00106 }
00107
00108 ApiServer::Distributor::~Distributor()
00109 {
00110 unbindEvent(dtn::routing::QueueBundleEvent::className);
00111 join();
00112 }
00113
00114 void ApiServer::Distributor::add(ClientHandler *obj)
00115 {
00116 {
00117 ibrcommon::MutexLock l(_lock);
00118 _connections.push_back(obj);
00119 }
00120 _tasks.push(new QueryBundleTask(obj->id));
00121 }
00122
00123 void ApiServer::Distributor::remove(ClientHandler *obj)
00124 {
00125 ibrcommon::MutexLock l(_lock);
00126 _connections.erase( std::remove( _connections.begin(), _connections.end(), obj), _connections.end() );
00127 }
00128
00129 bool ApiServer::Distributor::__cancellation()
00130 {
00131
00132 _tasks.abort();
00133
00134
00135 return true;
00136 }
00137
00141 void ApiServer::Distributor::run()
00142 {
00143 try
00144 {
00145 while (true)
00146 {
00147
00148 ApiServer::Task *task = _tasks.getnpop(true);
00149
00150 try {
00151 try {
00152 QueryBundleTask &query = dynamic_cast<QueryBundleTask&>(*task);
00153
00154 IBRCOMMON_LOGGER_DEBUG(60) << "QueryBundleTask: " << query.id << IBRCOMMON_LOGGER_ENDL;
00155
00156
00157 BundleStorage &storage = BundleCore::getInstance().getStorage();
00158
00159
00160 ibrcommon::MutexLock l(_lock);
00161
00162 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00163 {
00164 ClientHandler *handler = (*iter);
00165
00166 if (handler->id == query.id)
00167 {
00168 const dtn::data::Bundle b = storage.get( handler->getPeer() );
00169
00170
00171 handler->queue(b);
00172
00173
00174 _tasks.push(new QueryBundleTask(query.id));
00175
00176
00177 _tasks.push(new RemoveBundleTask(b));
00178 }
00179 }
00180 } catch (const std::bad_cast&) {};
00181
00182 try {
00183 ProcessBundleTask &pbt = dynamic_cast<ProcessBundleTask&>(*task);
00184
00185 IBRCOMMON_LOGGER_DEBUG(60) << "ProcessBundleTask: " << pbt.bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00186
00187
00188 ibrcommon::MutexLock l(_lock);
00189 bool deleteIt = false;
00190
00191 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00192 {
00193 ClientHandler *handler = (*iter);
00194
00195 if (handler->getPeer() == pbt.bundle.destination)
00196 {
00197
00198 _tasks.push(new TransferBundleTask(pbt.bundle, handler->id));
00199
00200 deleteIt = true;
00201 }
00202 }
00203
00204
00205 if (deleteIt) _tasks.push(new RemoveBundleTask(pbt.bundle));
00206
00207 } catch (const std::bad_cast&) { };
00208
00209 try {
00210 TransferBundleTask &transfer = dynamic_cast<TransferBundleTask&>(*task);
00211
00212 IBRCOMMON_LOGGER_DEBUG(60) << "TransferBundleTask: " << transfer.bundle.toString() << ", id: " << transfer.id << IBRCOMMON_LOGGER_ENDL;
00213
00214
00215 BundleStorage &storage = BundleCore::getInstance().getStorage();
00216
00217
00218 ibrcommon::MutexLock l(_lock);
00219
00220 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00221 {
00222 ClientHandler *handler = (*iter);
00223
00224 try {
00225 if (handler->id == transfer.id)
00226 {
00227 const dtn::data::Bundle b = storage.get( transfer.bundle );
00228
00229
00230 handler->queue(b);
00231 }
00232 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {};
00233 }
00234
00235 } catch (const std::bad_cast&) { };
00236
00237 try {
00238 RemoveBundleTask &r = dynamic_cast<RemoveBundleTask&>(*task);
00239
00240 IBRCOMMON_LOGGER_DEBUG(60) << "RemoveBundleTask: " << r.bundle.toString() << IBRCOMMON_LOGGER_ENDL;
00241
00242
00243 BundleStorage &storage = BundleCore::getInstance().getStorage();
00244
00245 storage.remove(r.bundle);
00246
00247 } catch (const std::bad_cast&) {};
00248 } catch (const std::exception&) {};
00249
00250 delete task;
00251
00252 yield();
00253 }
00254 } catch (const ibrcommon::QueueUnblockedException &ex) {
00255 IBRCOMMON_LOGGER_DEBUG(10) << "ApiServer::Distributor going down" << IBRCOMMON_LOGGER_ENDL;
00256 } catch (std::exception) {
00257 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected error or shutdown" << IBRCOMMON_LOGGER_ENDL;
00258 }
00259 }
00260
00264 void ApiServer::Distributor::raiseEvent(const Event *evt)
00265 {
00266 try {
00267 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00268 _tasks.push(new ApiServer::ProcessBundleTask(queued.bundle));
00269 } catch (std::bad_cast ex) {
00270
00271 }
00272 }
00273
00274 const std::string ApiServer::getName() const
00275 {
00276 return "ApiServer";
00277 }
00278
00279 void ApiServer::Distributor::shutdown()
00280 {
00281 while (true)
00282 {
00283 ClientHandler *client = NULL;
00284 {
00285 ibrcommon::MutexLock l(_lock);
00286 if (_connections.empty()) break;
00287 client = _connections.front();
00288 _connections.remove(client);
00289 }
00290
00291 client->shutdown();
00292 }
00293 }
00294
00295 ApiServer::ProcessBundleTask::ProcessBundleTask(const dtn::data::MetaBundle &b) : bundle(b) {}
00296 ApiServer::ProcessBundleTask::~ProcessBundleTask(){}
00297
00298 ApiServer::TransferBundleTask::TransferBundleTask(const dtn::data::BundleID &b, const size_t i) : bundle(b), id(i) {}
00299 ApiServer::TransferBundleTask::~TransferBundleTask() {}
00300
00301 ApiServer::RemoveBundleTask::RemoveBundleTask(const dtn::data::BundleID &b) : bundle(b) {}
00302 ApiServer::RemoveBundleTask::~RemoveBundleTask() {}
00303
00304 ApiServer::QueryBundleTask::QueryBundleTask(const size_t i) : id(i) {}
00305 ApiServer::QueryBundleTask::~QueryBundleTask() {}
00306 }
00307 }