• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/ClientHandler.cpp

Go to the documentation of this file.
00001 /*
00002  * ClientHandler.cpp
00003  *
00004  *  Created on: 24.06.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ClientHandler.h"
00009 #include "Configuration.h"
00010 #include "core/GlobalEvent.h"
00011 #include "core/BundleCore.h"
00012 #include "net/BundleReceivedEvent.h"
00013 #include "core/BundleEvent.h"
00014 #include <ibrdtn/streams/StreamContactHeader.h>
00015 #include <ibrdtn/data/Serializer.h>
00016 #include <iostream>
00017 #include <ibrcommon/Logger.h>
00018 #include <ibrdtn/data/AgeBlock.h>
00019 #include <ibrdtn/utils/Clock.h>
00020 
00021 #ifdef WITH_BUNDLE_SECURITY
00022 #include "security/SecurityManager.h"
00023 #endif
00024 
00025 using namespace dtn::data;
00026 using namespace dtn::streams;
00027 using namespace dtn::core;
00028 
00029 namespace dtn
00030 {
00031         namespace daemon
00032         {
00033                 ClientHandler::ClientHandler(ApiServerInterface &srv, ibrcommon::tcpstream *stream, size_t i)
00034                  : ibrcommon::DetachedThread(0), id(i), _srv(srv), _sender(*this), _stream(stream), _connection(*this, *_stream)
00035                 {
00036                         _connection.exceptions(std::ios::badbit | std::ios::eofbit);
00037 
00038                         if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00039                         {
00040                                 stream->enableNoDelay();
00041                         }
00042                 }
00043 
00044                 ClientHandler::~ClientHandler()
00045                 {
00046                         _sender.join();
00047                 }
00048 
00049                 const dtn::data::EID& ClientHandler::getPeer() const
00050                 {
00051                         return _eid;
00052                 }
00053 
00054                 void ClientHandler::eventShutdown(StreamConnection::ConnectionShutdownCases)
00055                 {
00056                 }
00057 
00058                 void ClientHandler::eventTimeout()
00059                 {
00060                 }
00061 
00062                 void ClientHandler::eventError()
00063                 {
00064                 }
00065 
00066                 void ClientHandler::eventConnectionUp(const StreamContactHeader &header)
00067                 {
00068                         if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME)
00069                         {
00070                                 // This node is working with compressed addresses
00071                                 // generate a number
00072                                 _eid = BundleCore::local + "." + header._localeid.getNode();
00073                         }
00074                         else
00075                         {
00076                                 // contact received event
00077                                 _eid = BundleCore::local + "/" + header._localeid.getNode();
00078                         }
00079 
00080                         _srv.connectionUp(this);
00081                 }
00082 
00083                 void ClientHandler::eventConnectionDown()
00084                 {
00085                         IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00086 
00087                         try {
00088                                 // stop the sender
00089                                 _sender.stop();
00090                         } catch (const ibrcommon::ThreadException &ex) {
00091                                 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00092                         }
00093                 }
00094 
00095                 void ClientHandler::eventBundleRefused()
00096                 {
00097                         try {
00098                                 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00099 
00100                                 // set ACK to zero
00101                                 _lastack = 0;
00102 
00103                         } catch (const ibrcommon::QueueUnblockedException&) {
00104                                 // pop on empty queue!
00105                         }
00106                 }
00107 
00108                 void ClientHandler::eventBundleForwarded()
00109                 {
00110                         try {
00111                                 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00112 
00113                                 // raise bundle event
00114                                 dtn::core::BundleEvent::raise(bundle, BUNDLE_DELIVERED);
00115 
00116                                 // set ACK to zero
00117                                 _lastack = 0;
00118 
00119                         } catch (const ibrcommon::QueueUnblockedException&) {
00120                                 // pop on empty queue!
00121                         }
00122                 }
00123 
00124                 void ClientHandler::eventBundleAck(size_t ack)
00125                 {
00126                         _lastack = ack;
00127                 }
00128 
00129                 void ClientHandler::initialize()
00130                 {
00131                         try {
00132                                 // start the ClientHandler (service)
00133                                 start();
00134                         } catch (const ibrcommon::ThreadException &ex) {
00135                                 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00136                         }
00137                 }
00138 
00139                 void ClientHandler::shutdown()
00140                 {
00141                         // shutdown
00142                         _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00143 
00144                         try {
00145                                 // abort the connection thread
00146                                 this->stop();
00147                         } catch (const ibrcommon::ThreadException &ex) {
00148                                 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00149                         }
00150                 }
00151 
00152                 void ClientHandler::finally()
00153                 {
00154                         IBRCOMMON_LOGGER_DEBUG(60) << "ClientHandler down" << IBRCOMMON_LOGGER_ENDL;
00155 
00156                         // remove the client from the list in ApiServer
00157                         _srv.connectionDown(this);
00158 
00159                         // close the stream
00160                         (*_stream).close();
00161 
00162                         try {
00163                                 // shutdown the sender thread
00164                                 _sender.stop();
00165                         } catch (const std::exception&) { };
00166                 }
00167 
00168                 void ClientHandler::run()
00169                 {
00170                         try {
00171                                 char flags = 0;
00172 
00173                                 // request acknowledgements
00174                                 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00175 
00176                                 // do the handshake
00177                                 _connection.handshake(dtn::core::BundleCore::local, 10, flags);
00178 
00179                                 // start the sender thread
00180                                 _sender.start();
00181 
00182                                 while (!_connection.eof())
00183                                 {
00184                                         dtn::data::Bundle bundle;
00185                                         dtn::data::DefaultDeserializer(_connection) >> bundle;
00186 
00187                                         // create a new sequence number
00188                                         bundle.relabel();
00189 
00190                                         // check address fields for "api:me", this has to be replaced
00191                                         dtn::data::EID clienteid("api:me");
00192 
00193                                         // set the source address to the sending EID
00194                                         bundle._source = _eid;
00195 
00196                                         if (bundle._destination == clienteid) bundle._destination = _eid;
00197                                         if (bundle._reportto == clienteid) bundle._reportto = _eid;
00198                                         if (bundle._custodian == clienteid) bundle._custodian = _eid;
00199 
00200                                         // if the timestamp is not set, add a ageblock
00201                                         if (bundle._timestamp == 0)
00202                                         {
00203                                                 // check for ageblock
00204                                                 try {
00205                                                         bundle.getBlock<dtn::data::AgeBlock>();
00206                                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) {
00207                                                         // add a new ageblock
00208                                                         bundle.push_front<dtn::data::AgeBlock>();
00209                                                 }
00210                                         }
00211 
00212 #ifdef WITH_BUNDLE_SECURITY
00213                                         // if the encrypt bit is set, then try to encrypt the bundle
00214                                         if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT))
00215                                         {
00216                                                 try {
00217                                                         dtn::security::SecurityManager::getInstance().encrypt(bundle);
00218 
00219                                                         bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT, false);
00220                                                 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00221                                                         // sign requested, but no key is available
00222                                                         IBRCOMMON_LOGGER(warning) << "No key available for encrypt process." << IBRCOMMON_LOGGER_ENDL;
00223                                                 } catch (const dtn::security::SecurityManager::EncryptException&) {
00224                                                         IBRCOMMON_LOGGER(warning) << "Encryption of bundle failed." << IBRCOMMON_LOGGER_ENDL;
00225                                                 }
00226                                         }
00227 
00228                                         // if the sign bit is set, then try to sign the bundle
00229                                         if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN))
00230                                         {
00231                                                 try {
00232                                                         dtn::security::SecurityManager::getInstance().sign(bundle);
00233 
00234                                                         bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN, false);
00235                                                 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00236                                                         // sign requested, but no key is available
00237                                                         IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
00238                                                 }
00239                                         }
00240 #endif
00241 
00242                                         // raise default bundle received event
00243                                         dtn::net::BundleReceivedEvent::raise(dtn::core::BundleCore::local, bundle, true);
00244 
00245                                         yield();
00246                                 }
00247                         } catch (const ibrcommon::ThreadException &ex) {
00248                                 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00249                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00250                         } catch (const dtn::SerializationFailedException &ex) {
00251                                 IBRCOMMON_LOGGER(error) << "ClientHandler::run(): SerializationFailedException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00252                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00253                         } catch (const ibrcommon::IOException &ex) {
00254                                 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): IOException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00255                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00256                         } catch (const dtn::InvalidDataException &ex) {
00257                                 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): InvalidDataException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00258                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00259                         } catch (const std::exception &ex) {
00260                                 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00261                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00262                         }
00263                 }
00264 
00265                 ClientHandler& operator>>(ClientHandler &conn, dtn::data::Bundle &bundle)
00266                 {
00267                         // get a bundle
00268                         dtn::data::DefaultDeserializer(conn._connection, dtn::core::BundleCore::getInstance()) >> bundle;
00269 
00270                         return conn;
00271                 }
00272 
00273                 ClientHandler& operator<<(ClientHandler &conn, const dtn::data::Bundle &bundle)
00274                 {
00275                         // add bundle to the queue
00276                         conn._sentqueue.push(bundle);
00277 
00278                         // transmit the bundle
00279                         dtn::data::DefaultSerializer(conn._connection) << bundle;
00280 
00281                         // mark the end of the bundle
00282                         conn._connection << std::flush;
00283 
00284                         return conn;
00285                 }
00286 
00287                 ClientHandler::Sender::Sender(ClientHandler &client)
00288                  : _client(client)
00289                 {
00290                 }
00291 
00292                 ClientHandler::Sender::~Sender()
00293                 {
00294                 }
00295 
00296                 bool ClientHandler::Sender::__cancellation()
00297                 {
00298                         // cancel the main thread in here
00299                         this->abort();
00300 
00301                         // return false, to signal that further cancel (the hardway) is needed
00302                         return false;
00303                 }
00304 
00305                 void ClientHandler::Sender::run()
00306                 {
00307                         // The queue is not cancel-safe with uclibc, so we need to
00308                         // disable cancel here
00309                         ibrcommon::Thread::CancelProtector __disable_cancellation__(false);
00310 
00311                         try {
00312                                 while (true)
00313                                 {
00314                                         dtn::data::Bundle bundle = getnpop(true);
00315 
00316 #ifdef WITH_BUNDLE_SECURITY
00317                                         // try to decrypt the bundle
00318                                         try {
00319                                                 dtn::security::SecurityManager::getInstance().decrypt(bundle);
00320                                         } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00321                                                 // decrypt needed, but no key is available
00322                                                 IBRCOMMON_LOGGER(warning) << "No key available for decrypt bundle." << IBRCOMMON_LOGGER_ENDL;
00323                                         } catch (const dtn::security::SecurityManager::DecryptException &ex) {
00324                                                 // decrypt failed
00325                                                 IBRCOMMON_LOGGER(warning) << "Decryption of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00326                                         }
00327 #endif
00328 
00329                                         // enable cancellation during transmission
00330                                         {
00331                                                 ibrcommon::Thread::CancelProtector __enable_cancellation__(true);
00332 
00333                                                 // send bundle
00334                                                 _client << bundle;
00335                                         }
00336 
00337                                         // idle a little bit
00338                                         yield();
00339                                 }
00340 
00341                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00342                                 IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00343                                 return;
00344                         } catch (const ibrcommon::IOException &ex) {
00345                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00346                         } catch (const dtn::InvalidDataException &ex) {
00347                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00348                         } catch (const std::exception &ex) {
00349                                 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00350                         }
00351 
00352                         try {
00353                                 _client.stop();
00354                         } catch (const ibrcommon::ThreadException &ex) {
00355                                 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::Sender::run(): ThreadException (" << ex.what() << ") on termination" << IBRCOMMON_LOGGER_ENDL;
00356                         }
00357                 }
00358 
00359                 void ClientHandler::queue(const dtn::data::Bundle &bundle)
00360                 {
00361                         _sender.push(bundle);
00362                 }
00363         }
00364 }

Generated on Wed Mar 30 2011 11:11:48 for IBR-DTNSuite by  doxygen 1.7.1