Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "ClientHandler.h"
00009 #include "Configuration.h"
00010 #include "core/GlobalEvent.h"
00011 #include "core/BundleCore.h"
00012 #include "net/BundleReceivedEvent.h"
00013 #include "core/BundleEvent.h"
00014 #include <ibrdtn/streams/StreamContactHeader.h>
00015 #include <ibrdtn/data/Serializer.h>
00016 #include <iostream>
00017 #include <ibrcommon/Logger.h>
00018
00019 using namespace dtn::data;
00020 using namespace dtn::streams;
00021 using namespace dtn::core;
00022
00023 namespace dtn
00024 {
00025 namespace daemon
00026 {
00027 ClientHandler::ClientHandler(ApiServerInterface &srv, ibrcommon::tcpstream *stream, size_t i)
00028 : ibrcommon::DetachedThread(0), id(i), _srv(srv), _sender(*this), _stream(stream), _connection(*this, *_stream)
00029 {
00030 _connection.exceptions(std::ios::badbit | std::ios::eofbit);
00031
00032 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00033 {
00034 stream->enableNoDelay();
00035 }
00036 }
00037
00038 ClientHandler::~ClientHandler()
00039 {
00040 _sender.join();
00041 }
00042
00043 const dtn::data::EID& ClientHandler::getPeer() const
00044 {
00045 return _eid;
00046 }
00047
00048 void ClientHandler::eventShutdown(StreamConnection::ConnectionShutdownCases)
00049 {
00050 }
00051
00052 void ClientHandler::eventTimeout()
00053 {
00054 }
00055
00056 void ClientHandler::eventError()
00057 {
00058 }
00059
00060 void ClientHandler::eventConnectionUp(const StreamContactHeader &header)
00061 {
00062 if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME)
00063 {
00064
00065
00066 _eid = BundleCore::local + "." + header._localeid.getNode();
00067 }
00068 else
00069 {
00070
00071 _eid = BundleCore::local + "/" + header._localeid.getNode();
00072 }
00073
00074 _srv.connectionUp(this);
00075 }
00076
00077 void ClientHandler::eventConnectionDown()
00078 {
00079 IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00080
00081 try {
00082
00083 _sender.stop();
00084 } catch (const ibrcommon::ThreadException &ex) {
00085 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00086 }
00087 }
00088
00089 void ClientHandler::eventBundleRefused()
00090 {
00091 try {
00092 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00093
00094
00095 _lastack = 0;
00096
00097 } catch (ibrcommon::QueueUnblockedException) {
00098
00099 }
00100 }
00101
00102 void ClientHandler::eventBundleForwarded()
00103 {
00104 try {
00105 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00106
00107
00108 dtn::core::BundleEvent::raise(bundle, BUNDLE_DELIVERED);
00109
00110
00111 _lastack = 0;
00112
00113 } catch (ibrcommon::QueueUnblockedException) {
00114
00115 }
00116 }
00117
00118 void ClientHandler::eventBundleAck(size_t ack)
00119 {
00120 _lastack = ack;
00121 }
00122
00123 void ClientHandler::initialize()
00124 {
00125 try {
00126
00127 start();
00128 } catch (const ibrcommon::ThreadException &ex) {
00129 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00130 }
00131 }
00132
00133 void ClientHandler::shutdown()
00134 {
00135
00136 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00137
00138 try {
00139
00140 this->stop();
00141 } catch (const ibrcommon::ThreadException &ex) {
00142 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00143 }
00144 }
00145
00146 void ClientHandler::finally()
00147 {
00148 IBRCOMMON_LOGGER_DEBUG(60) << "ClientHandler down" << IBRCOMMON_LOGGER_ENDL;
00149
00150
00151 _srv.connectionDown(this);
00152
00153
00154 (*_stream).close();
00155
00156 try {
00157
00158 _sender.stop();
00159 } catch (std::exception) { };
00160 }
00161
00162 void ClientHandler::run()
00163 {
00164 try {
00165 char flags = 0;
00166
00167
00168 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00169
00170
00171 _connection.handshake(dtn::core::BundleCore::local, 10, flags);
00172
00173
00174 _sender.start();
00175
00176 while (true)
00177 {
00178 dtn::data::Bundle bundle;
00179 dtn::data::DefaultDeserializer(_connection) >> bundle;
00180
00181
00182 bundle.relabel();
00183
00184
00185 dtn::data::EID clienteid("api:me");
00186
00187
00188 bundle._source = _eid;
00189
00190 if (bundle._destination == clienteid) bundle._destination = _eid;
00191 if (bundle._reportto == clienteid) bundle._reportto = _eid;
00192 if (bundle._custodian == clienteid) bundle._custodian = _eid;
00193
00194
00195 dtn::net::BundleReceivedEvent::raise(dtn::core::BundleCore::local, bundle, true);
00196
00197 yield();
00198 }
00199 } catch (const ibrcommon::ThreadException &ex) {
00200 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00201 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00202 } catch (const ibrcommon::IOException &ex) {
00203 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): IOException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00204 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00205 } catch (const dtn::InvalidDataException &ex) {
00206 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): InvalidDataException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00207 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00208 } catch (const std::exception &ex) {
00209 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00210 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00211 }
00212 }
00213
00214 ClientHandler& operator>>(ClientHandler &conn, dtn::data::Bundle &bundle)
00215 {
00216
00217 dtn::data::DefaultDeserializer(conn._connection, dtn::core::BundleCore::getInstance()) >> bundle;
00218
00219 return conn;
00220 }
00221
00222 ClientHandler& operator<<(ClientHandler &conn, const dtn::data::Bundle &bundle)
00223 {
00224
00225 conn._sentqueue.push(bundle);
00226
00227
00228 dtn::data::DefaultSerializer(conn._connection) << bundle;
00229
00230
00231 conn._connection << std::flush;
00232
00233 return conn;
00234 }
00235
00236 ClientHandler::Sender::Sender(ClientHandler &client)
00237 : _client(client)
00238 {
00239 }
00240
00241 ClientHandler::Sender::~Sender()
00242 {
00243 }
00244
00245 bool ClientHandler::Sender::__cancellation()
00246 {
00247
00248 this->abort();
00249
00250
00251 return false;
00252 }
00253
00254 void ClientHandler::Sender::run()
00255 {
00256
00257
00258 int oldstate;
00259 ibrcommon::Thread::disableCancel(oldstate);
00260
00261 try {
00262 while (true)
00263 {
00264 dtn::data::Bundle bundle = getnpop(true);
00265
00266
00267 ibrcommon::Thread::CancelProtector cprotect(true);
00268
00269
00270 {
00271 ibrcommon::Thread::CancelProtector cprotect(true);
00272
00273
00274 _client << bundle;
00275 }
00276
00277
00278 yield();
00279 }
00280
00281 } catch (const ibrcommon::QueueUnblockedException &ex) {
00282 IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00283 return;
00284 } catch (const ibrcommon::IOException &ex) {
00285 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00286 } catch (const dtn::InvalidDataException &ex) {
00287 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00288 } catch (const std::exception &ex) {
00289 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00290 }
00291
00292 try {
00293 _client.stop();
00294 } catch (const ibrcommon::ThreadException &ex) {
00295 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::Sender::run(): ThreadException (" << ex.what() << ") on termination" << IBRCOMMON_LOGGER_ENDL;
00296 }
00297 }
00298
00299 void ClientHandler::queue(const dtn::data::Bundle &bundle)
00300 {
00301 _sender.push(bundle);
00302 }
00303 }
00304 }