IBR-DTNSuite 0.6

daemon/src/api/ClientHandler.cpp

Go to the documentation of this file.
00001 /*
00002  * ClientHandler.cpp
00003  *
00004  *  Created on: 24.06.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "ClientHandler.h"
00010 #include "Configuration.h"
00011 #include "core/GlobalEvent.h"
00012 #include "core/BundleCore.h"
00013 #include "net/BundleReceivedEvent.h"
00014 #include "core/BundleEvent.h"
00015 #include <ibrdtn/streams/StreamContactHeader.h>
00016 #include <ibrdtn/data/Serializer.h>
00017 #include <iostream>
00018 #include <ibrcommon/Logger.h>
00019 #include <ibrdtn/data/AgeBlock.h>
00020 #include <ibrdtn/utils/Clock.h>
00021 
00022 #ifdef WITH_BUNDLE_SECURITY
00023 #include "security/SecurityManager.h"
00024 #endif
00025 
00026 #ifdef WITH_COMPRESSION
00027 #include <ibrdtn/data/CompressedPayloadBlock.h>
00028 #endif
00029 
00030 using namespace dtn::data;
00031 using namespace dtn::streams;
00032 using namespace dtn::core;
00033 
00034 namespace dtn
00035 {
00036         namespace api
00037         {
00038                 ClientHandler::ClientHandler(ApiServerInterface &srv, ibrcommon::tcpstream *stream, size_t i)
00039                  : ibrcommon::DetachedThread(0), id(i), _srv(srv), _sender(*this), _stream(stream), _connection(*this, *_stream)
00040                 {
00041                         _connection.exceptions(std::ios::badbit | std::ios::eofbit);
00042 
00043                         if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00044                         {
00045                                 stream->enableNoDelay();
00046                         }
00047                 }
00048 
00049                 ClientHandler::~ClientHandler()
00050                 {
00051                         _sender.join();
00052                 }
00053 
00054                 const dtn::data::EID& ClientHandler::getPeer() const
00055                 {
00056                         return _eid;
00057                 }
00058 
00059                 void ClientHandler::eventShutdown(StreamConnection::ConnectionShutdownCases)
00060                 {
00061                 }
00062 
00063                 void ClientHandler::eventTimeout()
00064                 {
00065                 }
00066 
00067                 void ClientHandler::eventError()
00068                 {
00069                 }
00070 
00071                 void ClientHandler::eventConnectionUp(const StreamContactHeader &header)
00072                 {
00073                         if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME)
00074                         {
00075                                 // This node is working with compressed addresses
00076                                 // generate a number
00077                                 _eid = BundleCore::local + "." + header._localeid.getHost();
00078                         }
00079                         else
00080                         {
00081                                 // contact received event
00082                                 _eid = BundleCore::local + "/" + header._localeid.getHost() + header._localeid.getApplication();
00083                         }
00084 
00085                         IBRCOMMON_LOGGER_DEBUG(20) << "new client connected: " << _eid.getString() << IBRCOMMON_LOGGER_ENDL;
00086 
00087                         _srv.connectionUp(this);
00088                 }
00089 
00090                 void ClientHandler::eventConnectionDown()
00091                 {
00092                         IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00093 
00094                         try {
00095                                 // stop the sender
00096                                 _sender.stop();
00097                         } catch (const ibrcommon::ThreadException &ex) {
00098                                 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00099                         }
00100                 }
00101 
00102                 void ClientHandler::eventBundleRefused()
00103                 {
00104                         try {
00105                                 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00106 
00107                                 // set ACK to zero
00108                                 _lastack = 0;
00109 
00110                         } catch (const ibrcommon::QueueUnblockedException&) {
00111                                 // pop on empty queue!
00112                         }
00113                 }
00114 
00115                 void ClientHandler::eventBundleForwarded()
00116                 {
00117                         try {
00118                                 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00119 
00120                                 // raise bundle event
00121                                 dtn::core::BundleEvent::raise(bundle, BUNDLE_DELIVERED);
00122 
00123                                 // set ACK to zero
00124                                 _lastack = 0;
00125 
00126                         } catch (const ibrcommon::QueueUnblockedException&) {
00127                                 // pop on empty queue!
00128                         }
00129                 }
00130 
00131                 void ClientHandler::eventBundleAck(size_t ack)
00132                 {
00133                         _lastack = ack;
00134                 }
00135 
00136                 void ClientHandler::initialize()
00137                 {
00138                         try {
00139                                 // start the ClientHandler (service)
00140                                 start();
00141                         } catch (const ibrcommon::ThreadException &ex) {
00142                                 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00143                         }
00144                 }
00145 
00146                 void ClientHandler::shutdown()
00147                 {
00148                         // shutdown
00149                         _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00150 
00151                         try {
00152                                 // abort the connection thread
00153                                 this->stop();
00154                         } catch (const ibrcommon::ThreadException &ex) {
00155                                 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00156                         }
00157                 }
00158 
00159                 bool ClientHandler::__cancellation()
00160                 {
00161                         // close the stream
00162                         try {
00163                                 (*_stream).close();
00164                         } catch (const ibrcommon::ConnectionClosedException&) { };
00165 
00166                         return true;
00167                 }
00168 
00169                 void ClientHandler::finally()
00170                 {
00171                         IBRCOMMON_LOGGER_DEBUG(60) << "ClientHandler down" << IBRCOMMON_LOGGER_ENDL;
00172 
00173                         // remove the client from the list in ApiServer
00174                         _srv.connectionDown(this);
00175 
00176                         // close the stream
00177                         try {
00178                                 (*_stream).close();
00179                         } catch (const ibrcommon::ConnectionClosedException&) { };
00180 
00181                         try {
00182                                 // shutdown the sender thread
00183                                 _sender.stop();
00184                         } catch (const std::exception&) { };
00185                 }
00186 
00187                 void ClientHandler::run()
00188                 {
00189                         try {
00190                                 char flags = 0;
00191 
00192                                 // request acknowledgements
00193                                 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00194 
00195                                 // do the handshake
00196                                 _connection.handshake(dtn::core::BundleCore::local, 10, flags);
00197 
00198                                 // start the sender thread
00199                                 _sender.start();
00200 
00201                                 while (!_connection.eof())
00202                                 {
00203                                         dtn::data::Bundle bundle;
00204                                         dtn::data::DefaultDeserializer(_connection) >> bundle;
00205 
00206                                         // create a new sequence number
00207                                         bundle.relabel();
00208 
00209                                         // check address fields for "api:me", this has to be replaced
00210                                         dtn::data::EID clienteid("api:me");
00211 
00212                                         // set the source address to the sending EID
00213                                         bundle._source = _eid;
00214 
00215                                         if (bundle._destination == clienteid) bundle._destination = _eid;
00216                                         if (bundle._reportto == clienteid) bundle._reportto = _eid;
00217                                         if (bundle._custodian == clienteid) bundle._custodian = _eid;
00218 
00219                                         // if the timestamp is not set, add a ageblock
00220                                         if (bundle._timestamp == 0)
00221                                         {
00222                                                 // check for ageblock
00223                                                 try {
00224                                                         bundle.getBlock<dtn::data::AgeBlock>();
00225                                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) {
00226                                                         // add a new ageblock
00227                                                         bundle.push_front<dtn::data::AgeBlock>();
00228                                                 }
00229                                         }
00230 
00231 #ifdef WITH_COMPRESSION
00232                                         // if the compression bit is set, then compress the bundle
00233                                         if (bundle.get(dtn::data::PrimaryBlock::IBRDTN_REQUEST_COMPRESSION))
00234                                         {
00235                                                 try {
00236                                                         dtn::data::CompressedPayloadBlock::compress(bundle, dtn::data::CompressedPayloadBlock::COMPRESSION_ZLIB);
00237                                                 } catch (const ibrcommon::Exception &ex) {
00238                                                         IBRCOMMON_LOGGER(warning) << "compression of bundle failed: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00239                                                 };
00240                                         }
00241 #endif
00242 
00243 #ifdef WITH_BUNDLE_SECURITY
00244                                         // if the encrypt bit is set, then try to encrypt the bundle
00245                                         if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT))
00246                                         {
00247                                                 try {
00248                                                         dtn::security::SecurityManager::getInstance().encrypt(bundle);
00249 
00250                                                         bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_ENCRYPT, false);
00251                                                 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00252                                                         // sign requested, but no key is available
00253                                                         IBRCOMMON_LOGGER(warning) << "No key available for encrypt process." << IBRCOMMON_LOGGER_ENDL;
00254                                                 } catch (const dtn::security::SecurityManager::EncryptException&) {
00255                                                         IBRCOMMON_LOGGER(warning) << "Encryption of bundle failed." << IBRCOMMON_LOGGER_ENDL;
00256                                                 }
00257                                         }
00258 
00259                                         // if the sign bit is set, then try to sign the bundle
00260                                         if (bundle.get(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN))
00261                                         {
00262                                                 try {
00263                                                         dtn::security::SecurityManager::getInstance().sign(bundle);
00264 
00265                                                         bundle.set(dtn::data::PrimaryBlock::DTNSEC_REQUEST_SIGN, false);
00266                                                 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00267                                                         // sign requested, but no key is available
00268                                                         IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
00269                                                 }
00270                                         }
00271 #endif
00272 
00273                                         // raise default bundle received event
00274                                         dtn::net::BundleReceivedEvent::raise(dtn::core::BundleCore::local, bundle, true);
00275 
00276                                         yield();
00277                                 }
00278                         } catch (const ibrcommon::ThreadException &ex) {
00279                                 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00280                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00281                         } catch (const dtn::SerializationFailedException &ex) {
00282                                 IBRCOMMON_LOGGER(error) << "ClientHandler::run(): SerializationFailedException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00283                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00284                         } catch (const ibrcommon::IOException &ex) {
00285                                 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): IOException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00286                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00287                         } catch (const dtn::InvalidDataException &ex) {
00288                                 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): InvalidDataException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00289                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00290                         } catch (const std::exception &ex) {
00291                                 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00292                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00293                         }
00294                 }
00295 
00296                 ClientHandler& operator>>(ClientHandler &conn, dtn::data::Bundle &bundle)
00297                 {
00298                         // get a bundle
00299                         dtn::data::DefaultDeserializer(conn._connection, dtn::core::BundleCore::getInstance()) >> bundle;
00300 
00301                         return conn;
00302                 }
00303 
00304                 ClientHandler& operator<<(ClientHandler &conn, const dtn::data::Bundle &bundle)
00305                 {
00306                         // add bundle to the queue
00307                         conn._sentqueue.push(bundle);
00308 
00309                         // transmit the bundle
00310                         dtn::data::DefaultSerializer(conn._connection) << bundle;
00311 
00312                         // mark the end of the bundle
00313                         conn._connection << std::flush;
00314 
00315                         return conn;
00316                 }
00317 
00318                 bool ClientHandler::good() const
00319                 {
00320                         return _stream->good();
00321                 }
00322 
00323                 ClientHandler::Sender::Sender(ClientHandler &client)
00324                  : _client(client)
00325                 {
00326                 }
00327 
00328                 ClientHandler::Sender::~Sender()
00329                 {
00330                 }
00331 
00332                 bool ClientHandler::Sender::__cancellation()
00333                 {
00334                         // cancel the main thread in here
00335                         this->abort();
00336 
00337                         return true;
00338                 }
00339 
00340                 void ClientHandler::Sender::run()
00341                 {
00342                         try {
00343                                 while (_client.good())
00344                                 {
00345                                         dtn::data::Bundle bundle = getnpop(true);
00346 
00347                                         // process the bundle block (security, compression, ...)
00348                                         dtn::core::BundleCore::processBlocks(bundle);
00349 
00350                                         // send bundle
00351                                         _client << bundle;
00352 
00353                                         // idle a little bit
00354                                         yield();
00355                                 }
00356                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00357                                 IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00358                                 return;
00359                         } catch (const ibrcommon::IOException &ex) {
00360                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00361                         } catch (const dtn::InvalidDataException &ex) {
00362                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00363                         } catch (const std::exception &ex) {
00364                                 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00365                         }
00366 
00367                         try {
00368                                 _client.stop();
00369                         } catch (const ibrcommon::ThreadException &ex) {
00370                                 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::Sender::run(): ThreadException (" << ex.what() << ") on termination" << IBRCOMMON_LOGGER_ENDL;
00371                         }
00372                 }
00373 
00374                 void ClientHandler::queue(const dtn::data::Bundle &bundle)
00375                 {
00376                         _sender.push(bundle);
00377                 }
00378         }
00379 }