|
IBR-DTNSuite 0.6
|
00001 /* 00002 * ClientHandler.cpp 00003 * 00004 * Created on: 24.06.2009 00005 * Author: morgenro 00006 */ 00007 00008 #include "config.h" 00009 #include "ClientHandler.h" 00010 #include "Configuration.h" 00011 #include "core/GlobalEvent.h" 00012 #include "core/BundleCore.h" 00013 #include "net/BundleReceivedEvent.h" 00014 #include "core/BundleEvent.h" 00015 #include <ibrdtn/streams/StreamContactHeader.h> 00016 #include <ibrdtn/data/Serializer.h> 00017 #include <iostream> 00018 #include <ibrcommon/Logger.h> 00019 #include <ibrdtn/data/AgeBlock.h> 00020 #include <ibrdtn/utils/Clock.h> 00021 00022 #ifdef WITH_BUNDLE_SECURITY 00023 #include "security/SecurityManager.h" 00024 #endif 00025 00026 #ifdef WITH_COMPRESSION 00027 #include <ibrdtn/data/CompressedPayloadBlock.h> 00028 #endif 00029 00030 using namespace dtn::data; 00031 using namespace dtn::streams; 00032 using namespace dtn::core; 00033 00034 namespace dtn 00035 { 00036 namespace api 00037 { 00038 ClientHandler::ClientHandler(ApiServerInterface &srv, ibrcommon::tcpstream *stream, size_t i) 00039 : ibrcommon::DetachedThread(0), id(i), _srv(srv), _sender(*this), _stream(stream), _connection(*this, *_stream) 00040 { 00041 _connection.exceptions(std::ios::badbit | std::ios::eofbit); 00042 00043 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() ) 00044 { 00045 stream->enableNoDelay(); 00046 } 00047 } 00048 00049 ClientHandler::~ClientHandler() 00050 { 00051 _sender.join(); 00052 } 00053 00054 const dtn::data::EID& ClientHandler::getPeer() const 00055 { 00056 return _eid; 00057 } 00058 00059 void ClientHandler::eventShutdown(StreamConnection::ConnectionShutdownCases) 00060 { 00061 } 00062 00063 void ClientHandler::eventTimeout() 00064 { 00065 } 00066 00067 void ClientHandler::eventError() 00068 { 00069 } 00070 00071 void ClientHandler::eventConnectionUp(const StreamContactHeader &header) 00072 { 00073 if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME) 00074 { 00075 // This node is working with compressed addresses 00076 // generate a number 00077 _eid = BundleCore::local + "." + header._localeid.getHost(); 00078 } 00079 else 00080 { 00081 // contact received event 00082 _eid = BundleCore::local + "/" + header._localeid.getHost() + header._localeid.getApplication(); 00083 } 00084 00085 IBRCOMMON_LOGGER_DEBUG(20) << "new client connected: " << _eid.getString() << IBRCOMMON_LOGGER_ENDL; 00086 00087 _srv.connectionUp(this); 00088 } 00089 00090 void ClientHandler::eventConnectionDown() 00091 { 00092 IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL; 00093 00094 try { 00095 // stop the sender 00096 _sender.stop(); 00097 } catch (const ibrcommon::ThreadException &ex) { 00098 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00099 } 00100 } 00101 00102 void ClientHandler::eventBundleRefused() 00103 { 00104 try { 00105 const dtn::data::Bundle bundle = _sentqueue.getnpop(); 00106 00107 // set ACK to zero 00108 _lastack = 0; 00109 00110 } catch (const ibrcommon::QueueUnblockedException&) { 00111 // pop on empty queue! 00112 } 00113 } 00114 00115 void ClientHandler::eventBundleForwarded() 00116 { 00117 try { 00118 const dtn::data::Bundle bundle = _sentqueue.getnpop(); 00119 00120 // raise bundle event 00121 dtn::core::BundleEvent::raise(bundle, BUNDLE_DELIVERED); 00122 00123 // set ACK to zero 00124 _lastack = 0; 00125 00126 } catch (const ibrcommon::QueueUnblockedException&) { 00127 // pop on empty queue! 00128 } 00129 } 00130 00131 void ClientHandler::eventBundleAck(size_t ack) 00132 { 00133 _lastack = ack; 00134 } 00135 00136 void ClientHandler::initialize() 00137 { 00138 try { 00139 // start the ClientHandler (service) 00140 start(); 00141 } catch (const ibrcommon::ThreadException &ex) { 00142 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00143 } 00144 } 00145 00146 void ClientHandler::shutdown() 00147 { 00148 // shutdown 00149 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00150 00151 try { 00152 // abort the connection thread 00153 this->stop(); 00154 } catch (const ibrcommon::ThreadException &ex) { 00155 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00156 } 00157 } 00158 00159 bool ClientHandler::__cancellation() 00160 { 00161 // close the stream 00162 try { 00163 (*_stream).close(); 00164 } catch (const ibrcommon::ConnectionClosedException&) { }; 00165 00166 return true; 00167 } 00168 00169 void ClientHandler::finally() 00170 { 00171 IBRCOMMON_LOGGER_DEBUG(60) << "ClientHandler down" << IBRCOMMON_LOGGER_ENDL; 00172 00173 // remove the client from the list in ApiServer 00174 _srv.connectionDown(this); 00175 00176 // close the stream 00177 try { 00178 (*_stream).close(); 00179 } catch (const ibrcommon::ConnectionClosedException&) { }; 00180 00181 try { 00182 // shutdown the sender thread 00183 _sender.stop(); 00184 } catch (const std::exception&) { }; 00185 } 00186 00187 void ClientHandler::run() 00188 { 00189 try { 00190 char flags = 0; 00191 00192 // request acknowledgements 00193 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS; 00194 00195 // do the handshake 00196 _connection.handshake(dtn::core::BundleCore::local, 10, flags); 00197 00198 // start the sender thread 00199 _sender.start(); 00200 00201 while (!_connection.eof()) 00202 { 00203 dtn::data::Bundle bundle; 00204 dtn::data::DefaultDeserializer(_connection) >> bundle; 00205 00206 // create a new sequence number 00207 bundle.relabel(); 00208 00209 // check address fields for "api:me", this has to be replaced 00210 dtn::data::EID clienteid("api:me"); 00211 00212 // set the source address to the sending EID 00213 bundle._source = _eid; 00214 00215 if (bundle._destination == clienteid) bundle._destination = _eid; 00216 if (bundle._reportto == clienteid) bundle._reportto = _eid; 00217 if (bundle._custodian == clienteid) bundle._custodian = _eid; 00218 00219 // if the timestamp is not set, add a ageblock 00220 if (bundle._timestamp == 0) 00221 { 00222 // check for ageblock 00223 try { 00224 bundle.getBlock<dtn::data::AgeBlock>(); 00225 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { 00226 // add a new ageblock 00227 bundle.push_front<dtn::data::AgeBlock>(); 00228 } 00229 } 00230 00231 #ifdef WITH_COMPRESSION 00232 // if the compression bit is set, then compress the bundle 00233 if (bundle.get(dtn::data::PrimaryBlock::IBRDTN_REQUEST_COMPRESSION)) 00234 { 00235 try { 00236 dtn::data::CompressedPayloadBlock::compress(bundle, dtn::data::CompressedPayloadBlock::COMPRESSION_ZLIB); 00237 } catch (const ibrcommon::Exception &ex) { 00238 IBRCOMMON_LOGGER(warning) << "compression of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00239 }; 00240 } 00241 #endif 00242 00243 #ifdef WITH_BUNDLE_SECURITY 00244 // if the encrypt bit is set, then try to encrypt the bundle 00245 if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT)) 00246 { 00247 try { 00248 dtn::security::SecurityManager::getInstance().encrypt(bundle); 00249 00250 bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT, false); 00251 } catch (const dtn::security::SecurityManager::KeyMissingException&) { 00252 // sign requested, but no key is available 00253 IBRCOMMON_LOGGER(warning) << "No key available for encrypt process." << IBRCOMMON_LOGGER_ENDL; 00254 } catch (const dtn::security::SecurityManager::EncryptException&) { 00255 IBRCOMMON_LOGGER(warning) << "Encryption of bundle failed." << IBRCOMMON_LOGGER_ENDL; 00256 } 00257 } 00258 00259 // if the sign bit is set, then try to sign the bundle 00260 if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN)) 00261 { 00262 try { 00263 dtn::security::SecurityManager::getInstance().sign(bundle); 00264 00265 bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN, false); 00266 } catch (const dtn::security::SecurityManager::KeyMissingException&) { 00267 // sign requested, but no key is available 00268 IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL; 00269 } 00270 } 00271 #endif 00272 00273 // raise default bundle received event 00274 dtn::net::BundleReceivedEvent::raise(dtn::core::BundleCore::local, bundle, true); 00275 00276 yield(); 00277 } 00278 } catch (const ibrcommon::ThreadException &ex) { 00279 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL; 00280 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00281 } catch (const dtn::SerializationFailedException &ex) { 00282 IBRCOMMON_LOGGER(error) << "ClientHandler::run(): SerializationFailedException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00283 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00284 } catch (const ibrcommon::IOException &ex) { 00285 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): IOException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00286 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00287 } catch (const dtn::InvalidDataException &ex) { 00288 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): InvalidDataException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00289 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00290 } catch (const std::exception &ex) { 00291 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL; 00292 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR); 00293 } 00294 } 00295 00296 ClientHandler& operator>>(ClientHandler &conn, dtn::data::Bundle &bundle) 00297 { 00298 // get a bundle 00299 dtn::data::DefaultDeserializer(conn._connection, dtn::core::BundleCore::getInstance()) >> bundle; 00300 00301 return conn; 00302 } 00303 00304 ClientHandler& operator<<(ClientHandler &conn, const dtn::data::Bundle &bundle) 00305 { 00306 // add bundle to the queue 00307 conn._sentqueue.push(bundle); 00308 00309 // transmit the bundle 00310 dtn::data::DefaultSerializer(conn._connection) << bundle; 00311 00312 // mark the end of the bundle 00313 conn._connection << std::flush; 00314 00315 return conn; 00316 } 00317 00318 bool ClientHandler::good() const 00319 { 00320 return _stream->good(); 00321 } 00322 00323 ClientHandler::Sender::Sender(ClientHandler &client) 00324 : _client(client) 00325 { 00326 } 00327 00328 ClientHandler::Sender::~Sender() 00329 { 00330 } 00331 00332 bool ClientHandler::Sender::__cancellation() 00333 { 00334 // cancel the main thread in here 00335 this->abort(); 00336 00337 return true; 00338 } 00339 00340 void ClientHandler::Sender::run() 00341 { 00342 try { 00343 while (_client.good()) 00344 { 00345 dtn::data::Bundle bundle = getnpop(true); 00346 00347 // process the bundle block (security, compression, ...) 00348 dtn::core::BundleCore::processBlocks(bundle); 00349 00350 // send bundle 00351 _client << bundle; 00352 00353 // idle a little bit 00354 yield(); 00355 } 00356 } catch (const ibrcommon::QueueUnblockedException &ex) { 00357 IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL; 00358 return; 00359 } catch (const ibrcommon::IOException &ex) { 00360 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00361 } catch (const dtn::InvalidDataException &ex) { 00362 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00363 } catch (const std::exception &ex) { 00364 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL; 00365 } 00366 00367 try { 00368 _client.stop(); 00369 } catch (const ibrcommon::ThreadException &ex) { 00370 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::Sender::run(): ThreadException (" << ex.what() << ") on termination" << IBRCOMMON_LOGGER_ENDL; 00371 } 00372 } 00373 00374 void ClientHandler::queue(const dtn::data::Bundle &bundle) 00375 { 00376 _sender.push(bundle); 00377 } 00378 } 00379 }