• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/ClientHandler.cpp

Go to the documentation of this file.
00001 /*
00002  * ClientHandler.cpp
00003  *
00004  *  Created on: 24.06.2009
00005  *      Author: morgenro
00006  */
00007 
00008 #include "ClientHandler.h"
00009 #include "Configuration.h"
00010 #include "core/GlobalEvent.h"
00011 #include "core/BundleCore.h"
00012 #include "net/BundleReceivedEvent.h"
00013 #include "core/BundleEvent.h"
00014 #include <ibrdtn/streams/StreamContactHeader.h>
00015 #include <ibrdtn/data/Serializer.h>
00016 #include <iostream>
00017 #include <ibrcommon/Logger.h>
00018 
00019 using namespace dtn::data;
00020 using namespace dtn::streams;
00021 using namespace dtn::core;
00022 
00023 namespace dtn
00024 {
00025         namespace daemon
00026         {
00027                 ClientHandler::ClientHandler(ApiServerInterface &srv, ibrcommon::tcpstream *stream, size_t i)
00028                  : ibrcommon::DetachedThread(0), id(i), _srv(srv), _sender(*this), _stream(stream), _connection(*this, *_stream)
00029                 {
00030                         _connection.exceptions(std::ios::badbit | std::ios::eofbit);
00031 
00032                         if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00033                         {
00034                                 stream->enableNoDelay();
00035                         }
00036                 }
00037 
00038                 ClientHandler::~ClientHandler()
00039                 {
00040                         _sender.join();
00041                 }
00042 
00043                 const dtn::data::EID& ClientHandler::getPeer() const
00044                 {
00045                         return _eid;
00046                 }
00047 
00048                 void ClientHandler::eventShutdown(StreamConnection::ConnectionShutdownCases)
00049                 {
00050                 }
00051 
00052                 void ClientHandler::eventTimeout()
00053                 {
00054                 }
00055 
00056                 void ClientHandler::eventError()
00057                 {
00058                 }
00059 
00060                 void ClientHandler::eventConnectionUp(const StreamContactHeader &header)
00061                 {
00062                         if (BundleCore::local.getScheme() == dtn::data::EID::CBHE_SCHEME)
00063                         {
00064                                 // This node is working with compressed addresses
00065                                 // generate a number
00066                                 _eid = BundleCore::local + "." + header._localeid.getNode();
00067                         }
00068                         else
00069                         {
00070                                 // contact received event
00071                                 _eid = BundleCore::local + "/" + header._localeid.getNode();
00072                         }
00073 
00074                         _srv.connectionUp(this);
00075                 }
00076 
00077                 void ClientHandler::eventConnectionDown()
00078                 {
00079                         IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00080 
00081                         try {
00082                                 // stop the sender
00083                                 _sender.stop();
00084                         } catch (const ibrcommon::ThreadException &ex) {
00085                                 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00086                         }
00087                 }
00088 
00089                 void ClientHandler::eventBundleRefused()
00090                 {
00091                         try {
00092                                 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00093 
00094                                 // set ACK to zero
00095                                 _lastack = 0;
00096 
00097                         } catch (ibrcommon::QueueUnblockedException) {
00098                                 // pop on empty queue!
00099                         }
00100                 }
00101 
00102                 void ClientHandler::eventBundleForwarded()
00103                 {
00104                         try {
00105                                 const dtn::data::Bundle bundle = _sentqueue.getnpop();
00106 
00107                                 // raise bundle event
00108                                 dtn::core::BundleEvent::raise(bundle, BUNDLE_DELIVERED);
00109 
00110                                 // set ACK to zero
00111                                 _lastack = 0;
00112 
00113                         } catch (ibrcommon::QueueUnblockedException) {
00114                                 // pop on empty queue!
00115                         }
00116                 }
00117 
00118                 void ClientHandler::eventBundleAck(size_t ack)
00119                 {
00120                         _lastack = ack;
00121                 }
00122 
00123                 void ClientHandler::initialize()
00124                 {
00125                         try {
00126                                 // start the ClientHandler (service)
00127                                 start();
00128                         } catch (const ibrcommon::ThreadException &ex) {
00129                                 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00130                         }
00131                 }
00132 
00133                 void ClientHandler::shutdown()
00134                 {
00135                         // shutdown
00136                         _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00137 
00138                         try {
00139                                 // abort the connection thread
00140                                 this->stop();
00141                         } catch (const ibrcommon::ThreadException &ex) {
00142                                 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00143                         }
00144                 }
00145 
00146                 void ClientHandler::finally()
00147                 {
00148                         IBRCOMMON_LOGGER_DEBUG(60) << "ClientHandler down" << IBRCOMMON_LOGGER_ENDL;
00149 
00150                         // remove the client from the list in ApiServer
00151                         _srv.connectionDown(this);
00152 
00153                         // close the stream
00154                         (*_stream).close();
00155 
00156                         try {
00157                                 // shutdown the sender thread
00158                                 _sender.stop();
00159                         } catch (std::exception) { };
00160                 }
00161 
00162                 void ClientHandler::run()
00163                 {
00164                         try {
00165                                 char flags = 0;
00166 
00167                                 // request acknowledgements
00168                                 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00169 
00170                                 // do the handshake
00171                                 _connection.handshake(dtn::core::BundleCore::local, 10, flags);
00172 
00173                                 // start the sender thread
00174                                 _sender.start();
00175 
00176                                 while (true)
00177                                 {
00178                                         dtn::data::Bundle bundle;
00179                                         dtn::data::DefaultDeserializer(_connection) >> bundle;
00180 
00181                                         // create a new sequence number
00182                                         bundle.relabel();
00183 
00184                                         // check address fields for "api:me", this has to be replaced
00185                                         dtn::data::EID clienteid("api:me");
00186 
00187                                         // set the source address to the sending EID
00188                                         bundle._source = _eid;
00189 
00190                                         if (bundle._destination == clienteid) bundle._destination = _eid;
00191                                         if (bundle._reportto == clienteid) bundle._reportto = _eid;
00192                                         if (bundle._custodian == clienteid) bundle._custodian = _eid;
00193 
00194                                         // raise default bundle received event
00195                                         dtn::net::BundleReceivedEvent::raise(dtn::core::BundleCore::local, bundle, true);
00196 
00197                                         yield();
00198                                 }
00199                         } catch (const ibrcommon::ThreadException &ex) {
00200                                 IBRCOMMON_LOGGER(error) << "failed to start thread in ClientHandler\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00201                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00202                         } catch (const ibrcommon::IOException &ex) {
00203                                 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): IOException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00204                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00205                         } catch (const dtn::InvalidDataException &ex) {
00206                                 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): InvalidDataException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00207                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00208                         } catch (const std::exception &ex) {
00209                                 IBRCOMMON_LOGGER_DEBUG(10) << "ClientHandler::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00210                                 _connection.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00211                         }
00212                 }
00213 
00214                 ClientHandler& operator>>(ClientHandler &conn, dtn::data::Bundle &bundle)
00215                 {
00216                         // get a bundle
00217                         dtn::data::DefaultDeserializer(conn._connection, dtn::core::BundleCore::getInstance()) >> bundle;
00218 
00219                         return conn;
00220                 }
00221 
00222                 ClientHandler& operator<<(ClientHandler &conn, const dtn::data::Bundle &bundle)
00223                 {
00224                         // add bundle to the queue
00225                         conn._sentqueue.push(bundle);
00226 
00227                         // transmit the bundle
00228                         dtn::data::DefaultSerializer(conn._connection) << bundle;
00229 
00230                         // mark the end of the bundle
00231                         conn._connection << std::flush;
00232 
00233                         return conn;
00234                 }
00235 
00236                 ClientHandler::Sender::Sender(ClientHandler &client)
00237                  : _client(client)
00238                 {
00239                 }
00240 
00241                 ClientHandler::Sender::~Sender()
00242                 {
00243                 }
00244 
00245                 bool ClientHandler::Sender::__cancellation()
00246                 {
00247                         // cancel the main thread in here
00248                         this->abort();
00249 
00250                         // return false, to signal that further cancel (the hardway) is needed
00251                         return false;
00252                 }
00253 
00254                 void ClientHandler::Sender::run()
00255                 {
00256                         // The queue is not cancel-safe with uclibc, so we need to
00257                         // disable cancel here
00258                         int oldstate;
00259                         ibrcommon::Thread::disableCancel(oldstate);
00260 
00261                         try {
00262                                 while (true)
00263                                 {
00264                                         dtn::data::Bundle bundle = getnpop(true);
00265 
00266                                         // enable cancellation during transmission
00267                                         ibrcommon::Thread::CancelProtector cprotect(true);
00268 
00269                                         // enable cancellation during transmission
00270                                         {
00271                                                 ibrcommon::Thread::CancelProtector cprotect(true);
00272 
00273                                                 // send bundle
00274                                                 _client << bundle;
00275                                         }
00276 
00277                                         // idle a little bit
00278                                         yield();
00279                                 }
00280 
00281                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00282                                 IBRCOMMON_LOGGER_DEBUG(40) << "ClientHandler::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00283                                 return;
00284                         } catch (const ibrcommon::IOException &ex) {
00285                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: IOException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00286                         } catch (const dtn::InvalidDataException &ex) {
00287                                 IBRCOMMON_LOGGER_DEBUG(10) << "API: InvalidDataException says " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00288                         } catch (const std::exception &ex) {
00289                                 IBRCOMMON_LOGGER_DEBUG(10) << "unexpected API error! " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00290                         }
00291 
00292                         try {
00293                                 _client.stop();
00294                         } catch (const ibrcommon::ThreadException &ex) {
00295                                 IBRCOMMON_LOGGER_DEBUG(50) << "ClientHandler::Sender::run(): ThreadException (" << ex.what() << ") on termination" << IBRCOMMON_LOGGER_ENDL;
00296                         }
00297                 }
00298 
00299                 void ClientHandler::queue(const dtn::data::Bundle &bundle)
00300                 {
00301                         _sender.push(bundle);
00302                 }
00303         }
00304 }

Generated on Thu Nov 11 2010 09:49:46 for IBR-DTNSuite by  doxygen 1.7.1