Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "ClientHandler.h"
00009 #include "Configuration.h"
00010 #include "core/GlobalEvent.h"
00011 #include "core/BundleCore.h"
00012 #include "net/BundleReceivedEvent.h"
00013 #include "core/BundleEvent.h"
00014 #include <ibrdtn/streams/StreamContactHeader.h>
00015 #include <ibrdtn/data/Serializer.h>
00016 #include <iostream>
00017 #include <ibrcommon/Logger.h>
00018 #include <ibrdtn/data/AgeBlock.h>
00019 #include <ibrdtn/utils/Clock.h>
00020
00021 #ifdef WITH_BUNDLE_SECURITY
00022 #include "security/SecurityManager.h"
00023 #endif
00024
00025 using namespace dtn::data;
00026 using namespace dtn::streams;
00027 using namespace dtn::core;
00028
00029 namespace dtn
00030 {
00031 namespace daemon
00032 {
00033 ClientHandler::ClientHandler(ApiServerInterface &srv, ibrcommon::tcpstream *stream, size_t i)
00034 : ibrcommon::DetachedThread(0), id(i), _srv(srv), _sender(*this), _stream(stream), _connection(*this, *_stream)
00035 {
00036 _connection.exceptions(std::ios::badbit | std::ios::eofbit);
00037
00038 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00039 {
00040 stream->enableNoDelay();
00041 }
00042 }
00043
00044 ClientHandler::~ClientHandler()
00045 {
00046 _sender.join();
00047 }
00048
00049 const dtn::data::EID& ClientHandler::getPeer() const
00050 {
00051 return _eid;
00052 }
00053
00054 void ClientHandler::eventShutdown(StreamConnection::ConnectionShutdownCases)
00055 {
00056 }
00057
00058 void ClientHandler::eventTimeout()
00059 {
00060 }
00061
00062 void ClientHandler::eventError()
00063 {
00064 }
00065
00066 void ClientHandler::eventConnectionUp(const StreamContactHeader &header)
00067 {
00068 if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME)
00069 {
00070
00071
00072 _eid = BundleCore::local + "." + header._localeid.getNode();
00073 }
00074 else
00075 {
00076
00077 _eid = BundleCore::local + "/" + header._localeid.getNode();
00078 }
00079
00080 _srv.connectionUp(this);
00081 }
00082
00083 void ClientHandler::eventConnectionDown()
00084 {
00085 IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00086
00087 try {
00088
00089 _sender.stop();
00090 } catch (const ibrcommon::ThreadException &ex) {
00091 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00092 }
00093 }
00094
00095 void ClientHandler::eventBundleRefused()
00096 {
00097 try {
00098 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00099
00100
00101 _lastack = 0;
00102
00103 } catch (const ibrcommon::QueueUnblockedException&) {
00104
00105 }
00106 }
00107
00108 void ClientHandler::eventBundleForwarded()
00109 {
00110 try {
00111 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00112
00113
00114 dtn::core::BundleEvent::raise(bundle, BUNDLE_DELIVERED);
00115
00116
00117 _lastack = 0;
00118
00119 } catch (const ibrcommon::QueueUnblockedException&) {
00120
00121 }
00122 }
00123
00124 void ClientHandler::eventBundleAck(size_t ack)
00125 {
00126 _lastack = ack;
00127 }
00128
00129 void ClientHandler::initialize()
00130 {
00131 try {
00132
00133 start();
00134 } catch (const ibrcommon::ThreadException &ex) {
00135 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00136 }
00137 }
00138
00139 void ClientHandler::shutdown()
00140 {
00141
00142 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00143
00144 try {
00145
00146 this->stop();
00147 } catch (const ibrcommon::ThreadException &ex) {
00148 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00149 }
00150 }
00151
00152 void ClientHandler::finally()
00153 {
00154 IBRCOMMON_LOGGER_DEBUG(60) << "ClientHandler down" << IBRCOMMON_LOGGER_ENDL;
00155
00156
00157 _srv.connectionDown(this);
00158
00159
00160 (*_stream).close();
00161
00162 try {
00163
00164 _sender.stop();
00165 } catch (const std::exception&) { };
00166 }
00167
00168 void ClientHandler::run()
00169 {
00170 try {
00171 char flags = 0;
00172
00173
00174 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00175
00176
00177 _connection.handshake(dtn::core::BundleCore::local, 10, flags);
00178
00179
00180 _sender.start();
00181
00182 while (!_connection.eof())
00183 {
00184 dtn::data::Bundle bundle;
00185 dtn::data::DefaultDeserializer(_connection) >> bundle;
00186
00187
00188 bundle.relabel();
00189
00190
00191 dtn::data::EID clienteid("api:me");
00192
00193
00194 bundle._source = _eid;
00195
00196 if (bundle._destination == clienteid) bundle._destination = _eid;
00197 if (bundle._reportto == clienteid) bundle._reportto = _eid;
00198 if (bundle._custodian == clienteid) bundle._custodian = _eid;
00199
00200
00201 if (bundle._timestamp == 0)
00202 {
00203
00204 try {
00205 bundle.getBlock<dtn::data::AgeBlock>();
00206 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) {
00207
00208 bundle.push_front<dtn::data::AgeBlock>();
00209 }
00210 }
00211
00212 #ifdef WITH_BUNDLE_SECURITY
00213
00214 if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT))
00215 {
00216 try {
00217 dtn::security::SecurityManager::getInstance().encrypt(bundle);
00218
00219 bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT, false);
00220 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00221
00222 IBRCOMMON_LOGGER(warning) << "No key available for encrypt process." << IBRCOMMON_LOGGER_ENDL;
00223 } catch (const dtn::security::SecurityManager::EncryptException&) {
00224 IBRCOMMON_LOGGER(warning) << "Encryption of bundle failed." << IBRCOMMON_LOGGER_ENDL;
00225 }
00226 }
00227
00228
00229 if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN))
00230 {
00231 try {
00232 dtn::security::SecurityManager::getInstance().sign(bundle);
00233
00234 bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN, false);
00235 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00236
00237 IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
00238 }
00239 }
00240 #endif
00241
00242
00243 dtn::net::BundleReceivedEvent::raise(dtn::core::BundleCore::local, bundle, true);
00244
00245 yield();
00246 }
00247 } catch (const ibrcommon::ThreadException &ex) {
00248 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00249 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00250 } catch (const dtn::SerializationFailedException &ex) {
00251 IBRCOMMON_LOGGER(error) << "ClientHandler::run(): SerializationFailedException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00252 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00253 } catch (const ibrcommon::IOException &ex) {
00254 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): IOException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00255 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00256 } catch (const dtn::InvalidDataException &ex) {
00257 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): InvalidDataException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00258 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00259 } catch (const std::exception &ex) {
00260 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00261 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00262 }
00263 }
00264
00265 ClientHandler& operator>>(ClientHandler &conn, dtn::data::Bundle &bundle)
00266 {
00267
00268 dtn::data::DefaultDeserializer(conn._connection, dtn::core::BundleCore::getInstance()) >> bundle;
00269
00270 return conn;
00271 }
00272
00273 ClientHandler& operator<<(ClientHandler &conn, const dtn::data::Bundle &bundle)
00274 {
00275
00276 conn._sentqueue.push(bundle);
00277
00278
00279 dtn::data::DefaultSerializer(conn._connection) << bundle;
00280
00281
00282 conn._connection << std::flush;
00283
00284 return conn;
00285 }
00286
00287 ClientHandler::Sender::Sender(ClientHandler &client)
00288 : _client(client)
00289 {
00290 }
00291
00292 ClientHandler::Sender::~Sender()
00293 {
00294 }
00295
00296 bool ClientHandler::Sender::__cancellation()
00297 {
00298
00299 this->abort();
00300
00301
00302 return false;
00303 }
00304
00305 void ClientHandler::Sender::run()
00306 {
00307
00308
00309 ibrcommon::Thread::CancelProtector __disable_cancellation__(false);
00310
00311 try {
00312 while (true)
00313 {
00314 dtn::data::Bundle bundle = getnpop(true);
00315
00316 #ifdef WITH_BUNDLE_SECURITY
00317
00318 try {
00319 dtn::security::SecurityManager::getInstance().decrypt(bundle);
00320 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00321
00322 IBRCOMMON_LOGGER(warning) << "No key available for decrypt bundle." << IBRCOMMON_LOGGER_ENDL;
00323 } catch (const dtn::security::SecurityManager::DecryptException &ex) {
00324
00325 IBRCOMMON_LOGGER(warning) << "Decryption of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00326 }
00327 #endif
00328
00329
00330 {
00331 ibrcommon::Thread::CancelProtector __enable_cancellation__(true);
00332
00333
00334 _client << bundle;
00335 }
00336
00337
00338 yield();
00339 }
00340
00341 } catch (const ibrcommon::QueueUnblockedException &ex) {
00342 IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00343 return;
00344 } catch (const ibrcommon::IOException &ex) {
00345 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00346 } catch (const dtn::InvalidDataException &ex) {
00347 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00348 } catch (const std::exception &ex) {
00349 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00350 }
00351
00352 try {
00353 _client.stop();
00354 } catch (const ibrcommon::ThreadException &ex) {
00355 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::Sender::run(): ThreadException (" << ex.what() << ") on termination" << IBRCOMMON_LOGGER_ENDL;
00356 }
00357 }
00358
00359 void ClientHandler::queue(const dtn::data::Bundle &bundle)
00360 {
00361 _sender.push(bundle);
00362 }
00363 }
00364 }