00001
00002
00003
00004
00005
00006
00007
00008 #include "ApiServer.h"
00009 #include "core/BundleCore.h"
00010 #include "core/EventSwitch.h"
00011 #include "routing/QueueBundleEvent.h"
00012 #include "ClientHandler.h"
00013 #include <ibrcommon/Logger.h>
00014 #include <typeinfo>
00015 #include <algorithm>
00016
00017 using namespace dtn::data;
00018 using namespace dtn::core;
00019 using namespace dtn::streams;
00020 using namespace std;
00021
00022 namespace dtn
00023 {
00024 namespace daemon
00025 {
00026 ApiServer::ApiServer(ibrcommon::NetInterface net, int port)
00027 : dtn::net::GenericServer<ClientHandler>(), _tcpsrv(net, port), _dist(_connections, _connection_lock)
00028 {
00029 _dist.start();
00030 }
00031
00032 ApiServer::~ApiServer()
00033 {
00034 if (isRunning())
00035 {
00036 componentDown();
00037 }
00038 }
00039
00040 ClientHandler* ApiServer::accept()
00041 {
00042 try {
00043
00044 ClientHandler *handler = new ClientHandler(_tcpsrv.accept());
00045
00046
00047 handler->start();
00048
00049 return handler;
00050 } catch (ibrcommon::SocketException ex) {
00051
00052 return NULL;
00053 }
00054 }
00055
00056 void ApiServer::listen()
00057 {
00058
00059 }
00060
00061 void ApiServer::shutdown()
00062 {
00063 shutdownAll();
00064 }
00065
00066 void ApiServer::connectionUp(ClientHandler *conn)
00067 {
00068 ibrcommon::MutexLock l(_connection_lock);
00069 _connections.push_back(conn);
00070 IBRCOMMON_LOGGER_DEBUG(5) << "Client connection up" << IBRCOMMON_LOGGER_ENDL;
00071 }
00072
00073 void ApiServer::connectionDown(ClientHandler *conn)
00074 {
00075 ibrcommon::MutexLock l(_connection_lock);
00076 _connections.erase( std::remove(_connections.begin(), _connections.end(), conn) );
00077 IBRCOMMON_LOGGER_DEBUG(5) << "Client connection down" << IBRCOMMON_LOGGER_ENDL;
00078 }
00079
00080 ApiServer::Distributor::Distributor(std::list<ClientHandler*> &connections, ibrcommon::Mutex &lock)
00081 : _running(true), _lock(lock), _connections(connections)
00082 {
00083 bindEvent(dtn::routing::QueueBundleEvent::className);
00084 }
00085
00086 ApiServer::Distributor::~Distributor()
00087 {
00088 unbindEvent(dtn::routing::QueueBundleEvent::className);
00089 shutdown();
00090 }
00091
00095 void ApiServer::Distributor::run()
00096 {
00097 try
00098 {
00099 while (_running)
00100 {
00101
00102 dtn::data::MetaBundle mb = _received.blockingpop();
00103
00104
00105 {
00106 ibrcommon::MutexLock l(_lock);
00107 std::queue<ClientHandler*> receivers;
00108
00109 for (std::list<ClientHandler*>::iterator iter = _connections.begin(); iter != _connections.end(); iter++)
00110 {
00111 ClientHandler *handler = (*iter);
00112 if (handler->getPeer() == mb.destination)
00113 {
00114 receivers.push(handler);
00115 }
00116 }
00117
00118 if (!receivers.empty())
00119 {
00120 try {
00121 BundleStorage &storage = BundleCore::getInstance().getStorage();
00122 dtn::data::Bundle bundle = storage.get( mb );
00123
00124 while (!receivers.empty())
00125 {
00126 ClientHandler *handler = receivers.front();
00127 IBRCOMMON_LOGGER_DEBUG(5) << "Transfer bundle " << mb.toString() << " to client " << handler->getPeer().getString() << IBRCOMMON_LOGGER_ENDL;
00128
00129 try {
00130
00131 (*handler) << bundle;
00132 } catch (ibrcommon::IOException ex) {
00133 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException" << IBRCOMMON_LOGGER_ENDL;
00134 handler->shutdown();
00135 } catch (dtn::InvalidDataException ex) {
00136 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException" << IBRCOMMON_LOGGER_ENDL;
00137 handler->shutdown();
00138 } catch (...) {
00139 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error!" << IBRCOMMON_LOGGER_ENDL;
00140 handler->shutdown();
00141 }
00142
00143 receivers.pop();
00144 }
00145
00146
00147 storage.remove( mb );
00148 } catch (dtn::core::BundleStorage::NoBundleFoundException ex) {
00149 IBRCOMMON_LOGGER_DEBUG(10) << "API: NoBundleFoundException; BundleID: " << mb.toString() << IBRCOMMON_LOGGER_ENDL;
00150 }
00151 }
00152 }
00153 }
00154 } catch (...) {
00155 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected error or shutdown" << IBRCOMMON_LOGGER_ENDL;
00156 }
00157 }
00158
00162 void ApiServer::Distributor::shutdown()
00163 {
00164 {
00165 ibrcommon::MutexLock l(_received);
00166 _running = false;
00167 _received.signal(true);
00168 }
00169 join();
00170 }
00171
00175 void ApiServer::Distributor::raiseEvent(const Event *evt)
00176 {
00177 try {
00178 const dtn::routing::QueueBundleEvent &queued = dynamic_cast<const dtn::routing::QueueBundleEvent&>(*evt);
00179 _received.push(queued.bundle);
00180 } catch (std::bad_cast ex) {
00181
00182 }
00183 }
00184 }
00185 }