• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/net/TCPConnection.cpp

Go to the documentation of this file.
00001 /*
00002  * TCPConnection.cpp
00003  *
00004  *  Created on: 26.04.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "core/BundleCore.h"
00010 #include "core/GlobalEvent.h"
00011 #include "core/BundleEvent.h"
00012 #include "core/BundleStorage.h"
00013 #include "Configuration.h"
00014 
00015 #include "net/TCPConvergenceLayer.h"
00016 #include "net/BundleReceivedEvent.h"
00017 #include "net/ConnectionEvent.h"
00018 #include "net/TransferCompletedEvent.h"
00019 #include "net/TransferAbortedEvent.h"
00020 #include "routing/RequeueBundleEvent.h"
00021 
00022 #include <ibrcommon/net/tcpclient.h>
00023 #include <ibrcommon/TimeMeasurement.h>
00024 #include <ibrcommon/net/vinterface.h>
00025 #include <ibrcommon/Logger.h>
00026 
00027 #include <iostream>
00028 #include <iomanip>
00029 
00030 #ifdef WITH_BUNDLE_SECURITY
00031 #include "security/SecurityManager.h"
00032 #endif
00033 
00034 namespace dtn
00035 {
00036         namespace net
00037         {
00038                 /*
00039                  * class TCPConnection
00040                  */
00041                 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout)
00042                  : _peer(), _node(Node::NODE_CONNECTED), _tcpstream(stream), _stream(*this, *stream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), _sender(*this, _keepalive_timeout),
00043                    _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv)
00044                 {
00045                         _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00046 
00047                         if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00048                         {
00049                                 stream->enableNoDelay();
00050                         }
00051 
00052                         stream->enableLinger(10);
00053                         stream->enableKeepalive();
00054                         _node.setProtocol(Node::CONN_TCPIP);
00055                 }
00056 
00057                 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, const dtn::data::EID &name, const size_t timeout)
00058                  : _peer(), _node(node), _tcpstream(new ibrcommon::tcpclient()), _stream(*this, *_tcpstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), _sender(*this, _keepalive_timeout),
00059                    _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv)
00060                 {
00061                         _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00062                 }
00063 
00064                 TCPConnection::~TCPConnection()
00065                 {
00066                 }
00067 
00068                 void TCPConnection::queue(const dtn::data::BundleID &bundle)
00069                 {
00070                         _sender.push(bundle);
00071                 }
00072 
00073                 const StreamContactHeader& TCPConnection::getHeader() const
00074                 {
00075                         return _peer;
00076                 }
00077 
00078                 const dtn::core::Node& TCPConnection::getNode() const
00079                 {
00080                         return _node;
00081                 }
00082 
00083                 void TCPConnection::rejectTransmission()
00084                 {
00085                         _stream.reject();
00086                 }
00087 
00088                 void TCPConnection::eventShutdown(StreamConnection::ConnectionShutdownCases)
00089                 {
00090                 }
00091 
00092                 void TCPConnection::eventTimeout()
00093                 {
00094                         // event
00095                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_TIMEOUT, _node);
00096 
00097                         // stop the receiver thread
00098                         this->stop();
00099                 }
00100 
00101                 void TCPConnection::eventError()
00102                 {
00103                 }
00104 
00105                 void TCPConnection::eventConnectionUp(const StreamContactHeader &header)
00106                 {
00107                         _peer = header;
00108                         _node.setEID(header._localeid);
00109                         _keepalive_timeout = header._keepalive * 1000;
00110 
00111                         // raise up event
00112                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_UP, _node);
00113                 }
00114 
00115                 void TCPConnection::eventConnectionDown()
00116                 {
00117                         IBRCOMMON_LOGGER_DEBUG(40) << "TCPConnection::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00118 
00119                         try {
00120                                 // stop the sender
00121                                 _sender.stop();
00122                         } catch (const ibrcommon::ThreadException &ex) {
00123                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00124                         }
00125 
00126                         if (_peer._localeid != dtn::data::EID())
00127                         {
00128                                 // event
00129                                 ConnectionEvent::raise(ConnectionEvent::CONNECTION_DOWN, _node);
00130                         }
00131                 }
00132 
00133                 void TCPConnection::eventBundleRefused()
00134                 {
00135                         try {
00136                                 const dtn::data::BundleID bundle = _sentqueue.getnpop();
00137 
00138                                 // requeue the bundle
00139                                 TransferAbortedEvent::raise(EID(_node.getURI()), bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED);
00140 
00141                                 // set ACK to zero
00142                                 _lastack = 0;
00143 
00144                         } catch (const ibrcommon::QueueUnblockedException&) {
00145                                 // pop on empty queue!
00146                                 IBRCOMMON_LOGGER(error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00147                         }
00148                 }
00149 
00150                 void TCPConnection::eventBundleForwarded()
00151                 {
00152                         try {
00153                                 const dtn::data::MetaBundle bundle = _sentqueue.getnpop();
00154 
00155                                 // signal completion of the transfer
00156                                 TransferCompletedEvent::raise(EID(_node.getURI()), bundle);
00157 
00158                                 // raise bundle event
00159                                 dtn::core::BundleEvent::raise(bundle, BUNDLE_FORWARDED);
00160 
00161                                 // set ACK to zero
00162                                 _lastack = 0;
00163                         } catch (const ibrcommon::QueueUnblockedException&) {
00164                                 // pop on empty queue!
00165                                 IBRCOMMON_LOGGER(error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00166                         }
00167                 }
00168 
00169                 void TCPConnection::eventBundleAck(size_t ack)
00170                 {
00171                         _lastack = ack;
00172                 }
00173 
00174                 void TCPConnection::initialize()
00175                 {
00176                         // start the receiver for incoming bundles + handshake
00177                         try {
00178                                 start();
00179                         } catch (const ibrcommon::ThreadException &ex) {
00180                                 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00181                         }
00182                 }
00183 
00184                 void TCPConnection::shutdown()
00185                 {
00186                         // shutdown
00187                         _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00188 
00189                         try {
00190                                 // abort the connection thread
00191                                 this->stop();
00192                         } catch (const ibrcommon::ThreadException &ex) {
00193                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00194                         }
00195                 }
00196 
00197                 void TCPConnection::finally()
00198                 {
00199                         IBRCOMMON_LOGGER_DEBUG(60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
00200 
00201                         // close the tcpstream
00202                         try {
00203                                 _tcpstream->close();
00204                         } catch (const ibrcommon::ConnectionClosedException&) { };
00205 
00206                         try {
00207                                 // shutdown the sender thread
00208                                 _sender.stop();
00209                         } catch (const std::exception&) { };
00210 
00211                         try {
00212                                 _callback.connectionDown(this);
00213                         } catch (const ibrcommon::MutexException&) { };
00214 
00215                         // clear the queue
00216                         clearQueue();
00217                 }
00218 
00219                 void TCPConnection::setup()
00220                 {
00221                         // try to connect to the other side
00222                         try {
00223                                 ibrcommon::tcpclient &client = dynamic_cast<ibrcommon::tcpclient&>(*_tcpstream);
00224                                 client.open(_node.getAddress(), _node.getPort());
00225 
00226                                 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00227                                 {
00228                                         _tcpstream->enableNoDelay();
00229                                 }
00230 
00231                                 _tcpstream->enableLinger(10);
00232                                 _tcpstream->enableKeepalive();
00233                                 _node.setProtocol(Node::CONN_TCPIP);
00234                         } catch (const ibrcommon::tcpclient::SocketException&) {
00235                                 // error on open, requeue all bundles in the queue
00236                                 IBRCOMMON_LOGGER(warning) << "connection to " << _node.getURI() << " (" << _node.getAddress() << ":" << _node.getPort() << ") failed" << IBRCOMMON_LOGGER_ENDL;
00237                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00238                                 throw;
00239                         } catch (const bad_cast&) { };
00240                 }
00241 
00242                 void TCPConnection::run()
00243                 {
00244                         try {
00245                                 // do the handshake
00246                                 char flags = 0;
00247 
00248                                 // enable ACKs and NACKs
00249                                 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00250                                 flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS;
00251 
00252                                 // do the handshake
00253                                 _stream.handshake(_name, _timeout, flags);
00254 
00255                                 // start the sender
00256                                 _sender.start();
00257 
00258                                 while (!_stream.eof())
00259                                 {
00260                                         try {
00261                                                 // create a new empty bundle
00262                                                 dtn::data::Bundle bundle;
00263 
00264                                                 // deserialize the bundle
00265                                                 (*this) >> bundle;
00266 
00267                                                 // check the bundle
00268                                                 if ( ( bundle._destination == EID() ) || ( bundle._source == EID() ) )
00269                                                 {
00270                                                         // invalid bundle!
00271                                                         throw dtn::data::Validator::RejectedException("destination or source EID is null");
00272                                                 }
00273 
00274                                                 // raise default bundle received event
00275                                                 dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle);
00276                                         }
00277                                         catch (const dtn::data::Validator::RejectedException &ex)
00278                                         {
00279                                                 // bundle rejected
00280                                                 rejectTransmission();
00281 
00282                                                 // display the rejection
00283                                                 IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00284                                         }
00285                                         catch (const dtn::InvalidDataException &ex) {
00286                                                 // bundle rejected
00287                                                 rejectTransmission();
00288 
00289                                                 // display the rejection
00290                                                 IBRCOMMON_LOGGER(warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00291                                         }
00292 
00293                                         yield();
00294                                 }
00295                         } catch (const ibrcommon::ThreadException &ex) {
00296                                 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00297                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00298                         } catch (const std::exception &ex) {
00299                                 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00300                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00301                         }
00302                 }
00303 
00304 
00305                 TCPConnection& operator>>(TCPConnection &conn, dtn::data::Bundle &bundle)
00306                 {
00307                         std::iostream &stream = conn._stream;
00308 
00309                         // check if the stream is still good
00310                         if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00311 
00312                         dtn::data::DefaultDeserializer(stream, dtn::core::BundleCore::getInstance()) >> bundle;
00313                         return conn;
00314                 }
00315 
00316                 TCPConnection& operator<<(TCPConnection &conn, const dtn::data::Bundle &bundle)
00317                 {
00318                         // prepare a measurement
00319                         ibrcommon::TimeMeasurement m;
00320 
00321                         std::iostream &stream = conn._stream;
00322 
00323                         // create a serializer
00324                         dtn::data::DefaultSerializer serializer(stream);
00325 
00326                         // put the bundle into the sentqueue
00327                         conn._sentqueue.push(bundle);
00328 
00329                         // start the measurement
00330                         m.start();
00331 
00332                         try {
00333                                 // activate exceptions for this method
00334                                 if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00335 
00336                                 // transmit the bundle
00337                                 serializer << bundle;
00338 
00339                                 // flush the stream
00340                                 stream << std::flush;
00341 
00342                                 // stop the time measurement
00343                                 m.stop();
00344 
00345                                 // get throughput
00346                                 double kbytes_per_second = (serializer.getLength(bundle) / m.getSeconds()) / 1024;
00347 
00348                                 // print out throughput
00349                                 IBRCOMMON_LOGGER_DEBUG(5) << "transfer finished after " << m << " with "
00350                                                 << std::setiosflags(std::ios::fixed) << std::setprecision(2) << kbytes_per_second << " kb/s" << IBRCOMMON_LOGGER_ENDL;
00351 
00352                         } catch (const ibrcommon::Exception &ex) {
00353                                 // the connection not available
00354                                 IBRCOMMON_LOGGER_DEBUG(10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00355 
00356                                 // forward exception
00357                                 throw;
00358                         }
00359 
00360                         return conn;
00361                 }
00362 
00363                 TCPConnection::Sender::Sender(TCPConnection &connection, size_t &keepalive_timeout)
00364                  : _connection(connection), _keepalive_timeout(keepalive_timeout)
00365                 {
00366                 }
00367 
00368                 TCPConnection::Sender::~Sender()
00369                 {
00370                         ibrcommon::JoinableThread::join();
00371                 }
00372 
00373                 bool TCPConnection::Sender::__cancellation()
00374                 {
00375                         // cancel the main thread in here
00376                         ibrcommon::Queue<dtn::data::BundleID>::abort();
00377 
00378                         // return false, to signal that further cancel (the hardway) is needed
00379                         return false;
00380                 }
00381 
00382                 void TCPConnection::Sender::run()
00383                 {
00384                         // The queue is not cancel-safe with uclibc, so we need to
00385                         // disable cancel here
00386                         ibrcommon::Thread::CancelProtector __disable_cancellation__(false);
00387 
00388                         try {
00389                                 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00390 
00391                                 while (true)
00392                                 {
00393                                         try {
00394                                                 _current_transfer = ibrcommon::Queue<dtn::data::BundleID>::getnpop(true, _keepalive_timeout);
00395 
00396                                                 try {
00397                                                         // read the bundle out of the storage
00398                                                         dtn::data::Bundle bundle = storage.get(_current_transfer);
00399 
00400 #ifdef WITH_BUNDLE_SECURITY
00401                                                         const dtn::daemon::Configuration::Security::Level seclevel =
00402                                                                         dtn::daemon::Configuration::getInstance().getSecurity().getLevel();
00403 
00404                                                         if (seclevel & dtn::daemon::Configuration::Security::SECURITY_LEVEL_AUTHENTICATED)
00405                                                         {
00406                                                                 try {
00407                                                                         dtn::security::SecurityManager::getInstance().auth(bundle);
00408                                                                 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00409                                                                         // sign requested, but no key is available
00410                                                                         IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
00411                                                                 }
00412                                                         }
00413 #endif
00414 
00415                                                         // enable cancellation during transmission
00416                                                         ibrcommon::Thread::CancelProtector __enable_cancellation__(true);
00417 
00418                                                         // send bundle
00419                                                         _connection << bundle;
00420 
00421                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00422                                                         // send transfer aborted event
00423                                                         TransferAbortedEvent::raise(EID(_connection._node.getURI()), _current_transfer, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00424                                                 }
00425                                                 
00426                                                 // unset the current transfer
00427                                                 _current_transfer = dtn::data::BundleID();
00428                                                 
00429                                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00430                                                 switch (ex.reason)
00431                                                 {
00432                                                         case ibrcommon::QueueUnblockedException::QUEUE_ERROR:
00433                                                         case ibrcommon::QueueUnblockedException::QUEUE_ABORT:
00434                                                                 throw;
00435                                                         case ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT:
00436                                                         {
00437                                                                 // send a keepalive
00438                                                                 _connection.keepalive();
00439                                                         }
00440                                                 }
00441                                         }
00442 
00443                                         // idle a little bit
00444                                         yield();
00445                                 }
00446                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00447                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00448                                 return;
00449                         } catch (const std::exception &ex) {
00450                                 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00451                         }
00452 
00453                         _connection.stop();
00454                 }
00455 
00456                 void TCPConnection::clearQueue()
00457                 {
00458                         // requeue all bundles still queued
00459                         try {
00460                                 while (true)
00461                                 {
00462                                         const dtn::data::BundleID id = _sender.getnpop();
00463 
00464                                         // raise transfer abort event for all bundles without an ACK
00465                                         dtn::routing::RequeueBundleEvent::raise(_node.getURI(), id);
00466                                 }
00467                         } catch (const ibrcommon::QueueUnblockedException&) {
00468                                 // queue emtpy
00469                         }
00470 
00471                         // requeue all bundles still in transit
00472                         try {
00473                                 while (true)
00474                                 {
00475                                         const dtn::data::BundleID id = _sentqueue.getnpop();
00476 
00477                                         if (_lastack > 0)
00478                                         {
00479                                                 // some data are already acknowledged, make a fragment?
00480                                                 //TODO: make a fragment
00481                                                 dtn::routing::RequeueBundleEvent::raise(_node.getURI(), id);
00482                                         }
00483                                         else
00484                                         {
00485                                                 // raise transfer abort event for all bundles without an ACK
00486                                                 dtn::routing::RequeueBundleEvent::raise(_node.getURI(), id);
00487                                         }
00488 
00489                                         // set last ack to zero
00490                                         _lastack = 0;
00491                                 }
00492                         } catch (const ibrcommon::QueueUnblockedException&) {
00493                                 // queue emtpy
00494                         }
00495                 }
00496 
00497                 void TCPConnection::keepalive()
00498                 {
00499                         _stream.keepalive();
00500                 }
00501 
00502                 void TCPConnection::Sender::finally()
00503                 {
00504                         // notify the aborted transfer of the last bundle
00505                         if (_current_transfer != dtn::data::BundleID())
00506                         {
00507                                 dtn::routing::RequeueBundleEvent::raise(_connection._node.getURI(), _current_transfer);
00508                         }
00509                 }
00510 
00511                 bool TCPConnection::match(const dtn::core::Node &n) const
00512                 {
00513                         return (_node == n);
00514                 }
00515 
00516                 bool TCPConnection::match(const dtn::data::EID &destination) const
00517                 {
00518                         return (_node.getURI() == destination.getNodeEID());
00519                 }
00520 
00521                 bool TCPConnection::match(const NodeEvent &evt) const
00522                 {
00523                         const dtn::core::Node &n = evt.getNode();
00524                         return match(n);
00525                 }
00526         }
00527 }

Generated on Wed Mar 30 2011 11:11:49 for IBR-DTNSuite by  doxygen 1.7.1