• Main Page
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

daemon/src/net/TCPConnection.cpp

Go to the documentation of this file.
00001 /*
00002  * TCPConnection.cpp
00003  *
00004  *  Created on: 26.04.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "core/BundleCore.h"
00009 #include "core/GlobalEvent.h"
00010 #include "core/BundleEvent.h"
00011 #include "core/BundleStorage.h"
00012 #include "Configuration.h"
00013 
00014 #include "net/TCPConvergenceLayer.h"
00015 #include "net/BundleReceivedEvent.h"
00016 #include "net/ConnectionEvent.h"
00017 #include "net/TransferCompletedEvent.h"
00018 #include "net/TransferAbortedEvent.h"
00019 
00020 #include <ibrcommon/TimeMeasurement.h>
00021 #include <ibrcommon/net/NetInterface.h>
00022 #include <ibrcommon/Logger.h>
00023 
00024 #include <iostream>
00025 #include <iomanip>
00026 
00027 namespace dtn
00028 {
00029         namespace net
00030         {
00031                 /*
00032                  * class TCPConnection
00033                  */
00034                 TCPConvergenceLayer::TCPConnection::TCPConnection(GenericServer<TCPConnection> &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout)
00035                  : GenericConnection<TCPConvergenceLayer::TCPConnection>((GenericServer<TCPConvergenceLayer::TCPConnection>&)tcpsrv), ibrcommon::DetachedThread(),
00036                    _peer(), _node(Node::NODE_CONNECTED), _tcpstream(stream), _stream(*this, *stream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), _sender(*this, _keepalive_timeout),
00037                    _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0)
00038                 {
00039                         _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00040 
00041                         if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00042                         {
00043                                 stream->enableNoDelay();
00044                         }
00045 
00046                         stream->enableLinger(10);
00047                         stream->enableKeepalive();
00048                         _node.setProtocol(Node::CONN_TCPIP);
00049                 }
00050 
00051                 TCPConvergenceLayer::TCPConnection::~TCPConnection()
00052                 {
00053                 }
00054 
00055                 void TCPConvergenceLayer::TCPConnection::queue(const dtn::data::BundleID &bundle)
00056                 {
00057                         _sender.push(bundle);
00058                 }
00059 
00060                 const StreamContactHeader TCPConvergenceLayer::TCPConnection::getHeader() const
00061                 {
00062                         return _peer;
00063                 }
00064 
00065                 const dtn::core::Node& TCPConvergenceLayer::TCPConnection::getNode() const
00066                 {
00067                         return _node;
00068                 }
00069 
00070                 void TCPConvergenceLayer::TCPConnection::rejectTransmission()
00071                 {
00072                         _stream.reject();
00073                 }
00074 
00075                 void TCPConvergenceLayer::TCPConnection::eventShutdown(StreamConnection::ConnectionShutdownCases csc)
00076                 {
00077                 }
00078 
00079                 void TCPConvergenceLayer::TCPConnection::eventTimeout()
00080                 {
00081                         // event
00082                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_TIMEOUT, _node);
00083 
00084                         // stop the receiver thread
00085                         this->stop();
00086                 }
00087 
00088                 void TCPConvergenceLayer::TCPConnection::eventError()
00089                 {
00090                 }
00091 
00092                 void TCPConvergenceLayer::TCPConnection::eventConnectionUp(const StreamContactHeader &header)
00093                 {
00094                         _peer = header;
00095                         _node.setURI(header._localeid.getString());
00096                         _keepalive_timeout = header._keepalive * 1000;
00097 
00098                         // raise up event
00099                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_UP, _node);
00100                 }
00101 
00102                 void TCPConvergenceLayer::TCPConnection::eventConnectionDown()
00103                 {
00104                         IBRCOMMON_LOGGER_DEBUG(40) << "TCPConnection::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00105 
00106                         try {
00107                                 // stop the sender
00108                                 _sender.stop();
00109                         } catch (const ibrcommon::ThreadException &ex) {
00110                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00111                         }
00112 
00113                         if (_peer._localeid != dtn::data::EID())
00114                         {
00115                                 // event
00116                                 ConnectionEvent::raise(ConnectionEvent::CONNECTION_DOWN, _node);
00117                         }
00118                 }
00119 
00120                 void TCPConvergenceLayer::TCPConnection::eventBundleRefused()
00121                 {
00122                         try {
00123                                 const dtn::data::BundleID bundle = _sentqueue.getnpop();
00124 
00125                                 // requeue the bundle
00126                                 TransferAbortedEvent::raise(EID(_node.getURI()), bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED);
00127 
00128                                 // set ACK to zero
00129                                 _lastack = 0;
00130 
00131                         } catch (ibrcommon::QueueUnblockedException) {
00132                                 // pop on empty queue!
00133                                 IBRCOMMON_LOGGER(error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00134                         }
00135                 }
00136 
00137                 void TCPConvergenceLayer::TCPConnection::eventBundleForwarded()
00138                 {
00139                         try {
00140                                 const dtn::data::MetaBundle bundle = _sentqueue.getnpop();
00141 
00142                                 // signal completion of the transfer
00143                                 TransferCompletedEvent::raise(EID(_node.getURI()), bundle);
00144 
00145                                 // raise bundle event
00146                                 dtn::core::BundleEvent::raise(bundle, BUNDLE_FORWARDED);
00147 
00148                                 // set ACK to zero
00149                                 _lastack = 0;
00150                         } catch (ibrcommon::QueueUnblockedException) {
00151                                 // pop on empty queue!
00152                                 IBRCOMMON_LOGGER(error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00153                         }
00154                 }
00155 
00156                 void TCPConvergenceLayer::TCPConnection::eventBundleAck(size_t ack)
00157                 {
00158                         _lastack = ack;
00159                 }
00160 
00161                 void TCPConvergenceLayer::TCPConnection::initialize()
00162                 {
00163                         // start the receiver for incoming bundles + handshake
00164                         try {
00165                                 start();
00166                         } catch (const ibrcommon::ThreadException &ex) {
00167                                 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00168                         }
00169                 }
00170 
00171                 void TCPConvergenceLayer::TCPConnection::shutdown()
00172                 {
00173                         // shutdown
00174                         _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00175 
00176                         try {
00177                                 // abort the connection thread
00178                                 this->stop();
00179                         } catch (const ibrcommon::ThreadException &ex) {
00180                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00181                         }
00182                 }
00183 
00184                 void TCPConvergenceLayer::TCPConnection::finally()
00185                 {
00186                         IBRCOMMON_LOGGER_DEBUG(60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
00187 
00188                         // close the tcpstream
00189                         try {
00190                                 _tcpstream->close();
00191                         } catch (const ibrcommon::ConnectionClosedException&) { };
00192 
00193                         try {
00194                                 // shutdown the sender thread
00195                                 _sender.stop();
00196                         } catch (const std::exception&) { };
00197 
00198                         try {
00199                                 ibrcommon::MutexLock l(_server.mutex());
00200                                 _server.remove(this);
00201                         } catch (const ibrcommon::MutexException&) { };
00202 
00203                         // clear the queue
00204                         clearQueue();
00205                 }
00206 
00207                 void TCPConvergenceLayer::TCPConnection::run()
00208                 {
00209                         try {
00210                                 // do the handshake
00211                                 char flags = 0;
00212 
00213                                 // enable ACKs and NACKs
00214                                 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00215                                 flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS;
00216 
00217                                 // do the handshake
00218                                 _stream.handshake(_name, _timeout, flags);
00219 
00220                                 // start the sender
00221                                 _sender.start();
00222 
00223                                 while (_stream.good())
00224                                 {
00225                                         try {
00226                                                 // create a new empty bundle
00227                                                 dtn::data::Bundle bundle;
00228 
00229                                                 // deserialize the bundle
00230                                                 (*this) >> bundle;
00231 
00232                                                 // check the bundle
00233                                                 if ( ( bundle._destination == EID() ) || ( bundle._source == EID() ) )
00234                                                 {
00235                                                         // invalid bundle!
00236                                                         throw dtn::data::Validator::RejectedException("destination or source EID is null");
00237                                                 }
00238 
00239                                                 // raise default bundle received event
00240                                                 dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle);
00241                                         }
00242                                         catch (const dtn::data::Validator::RejectedException &ex)
00243                                         {
00244                                                 // bundle rejected
00245                                                 rejectTransmission();
00246 
00247                                                 // display the rejection
00248                                                 IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00249                                         }
00250 
00251                                         yield();
00252                                 }
00253                         } catch (const ibrcommon::ThreadException &ex) {
00254                                 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00255                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00256                         } catch (const std::exception &ex) {
00257                                 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00258                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00259                         }
00260                 }
00261 
00262 
00263                 TCPConvergenceLayer::TCPConnection& operator>>(TCPConvergenceLayer::TCPConnection &conn, dtn::data::Bundle &bundle)
00264                 {
00265                         std::iostream &stream = conn._stream;
00266 
00267                         // activate exceptions for this method
00268                         if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00269 
00270                         dtn::data::DefaultDeserializer(stream, dtn::core::BundleCore::getInstance()) >> bundle;
00271                         return conn;
00272                 }
00273 
00274                 TCPConvergenceLayer::TCPConnection& operator<<(TCPConvergenceLayer::TCPConnection &conn, const dtn::data::Bundle &bundle)
00275                 {
00276                         // prepare a measurement
00277                         ibrcommon::TimeMeasurement m;
00278 
00279                         std::iostream &stream = conn._stream;
00280 
00281                         // create a serializer
00282                         dtn::data::DefaultSerializer serializer(stream);
00283 
00284                         // put the bundle into the sentqueue
00285                         conn._sentqueue.push(bundle);
00286 
00287                         // start the measurement
00288                         m.start();
00289 
00290                         try {
00291                                 // activate exceptions for this method
00292                                 if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00293 
00294                                 // transmit the bundle
00295                                 serializer << bundle;
00296 
00297                                 // flush the stream
00298                                 stream << std::flush;
00299 
00300                                 // stop the time measurement
00301                                 m.stop();
00302 
00303                                 // get throughput
00304                                 double kbytes_per_second = (serializer.getLength(bundle) / m.getSeconds()) / 1024;
00305 
00306                                 // print out throughput
00307                                 IBRCOMMON_LOGGER_DEBUG(5) << "transfer finished after " << m << " with "
00308                                                 << std::setiosflags(std::ios::fixed) << std::setprecision(2) << kbytes_per_second << " kb/s" << IBRCOMMON_LOGGER_ENDL;
00309 
00310                         } catch (const ibrcommon::Exception &ex) {
00311                                 // the connection not available
00312                                 IBRCOMMON_LOGGER_DEBUG(10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00313 
00314                                 // forward exception
00315                                 throw;
00316                         }
00317 
00318                         return conn;
00319                 }
00320 
00321                 TCPConvergenceLayer::TCPConnection::Sender::Sender(TCPConnection &connection, size_t &keepalive_timeout)
00322                  : _abort(false), _connection(connection), _keepalive_timeout(keepalive_timeout)
00323                 {
00324                 }
00325 
00326                 TCPConvergenceLayer::TCPConnection::Sender::~Sender()
00327                 {
00328                         join();
00329                 }
00330 
00331                 bool TCPConvergenceLayer::TCPConnection::Sender::__cancellation()
00332                 {
00333                         // cancel the main thread in here
00334                         _abort = true;
00335                         this->abort();
00336 
00337                         // return false, to signal that further cancel (the hardway) is needed
00338                         return false;
00339                 }
00340 
00341                 void TCPConvergenceLayer::TCPConnection::Sender::run()
00342                 {
00343                         // The queue is not cancel-safe with uclibc, so we need to
00344                         // disable cancel here
00345                         int oldstate;
00346                         ibrcommon::Thread::disableCancel(oldstate);
00347 
00348                         try {
00349                                 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00350 
00351                                 while (!_abort)
00352                                 {
00353                                         try {
00354                                                 dtn::data::BundleID _current_transfer = getnpop(true, _keepalive_timeout);
00355 
00356                                                 try {
00357                                                         // read the bundle out of the storage
00358                                                         const dtn::data::Bundle bundle = storage.get(_current_transfer);
00359 
00360                                                         // enable cancellation during transmission
00361                                                         ibrcommon::Thread::CancelProtector cprotect(true);
00362 
00363                                                         // send bundle
00364                                                         _connection << bundle;
00365 
00366                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00367                                                         // send transfer aborted event
00368                                                         TransferAbortedEvent::raise(EID(_connection._node.getURI()), _current_transfer, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00369                                                 }
00370                                                 
00371                                                 // unset the current transfer
00372                                                 _current_transfer = dtn::data::BundleID();
00373                                                 
00374                                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00375                                                 switch (ex.reason)
00376                                                 {
00377                                                         case ibrcommon::QueueUnblockedException::QUEUE_ERROR:
00378                                                         case ibrcommon::QueueUnblockedException::QUEUE_ABORT:
00379                                                                 throw;
00380                                                         case ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT:
00381                                                         {
00382                                                                 // send a keepalive
00383                                                                 _connection.keepalive();
00384                                                         }
00385                                                 }
00386                                         }
00387 
00388                                         // idle a little bit
00389                                         yield();
00390                                 }
00391                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00392                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00393                                 return;
00394                         } catch (const std::exception &ex) {
00395                                 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00396                         }
00397 
00398                         _connection.stop();
00399                 }
00400 
00401                 void TCPConvergenceLayer::TCPConnection::clearQueue()
00402                 {
00403                         try {
00404                                 while (true)
00405                                 {
00406                                         const dtn::data::BundleID id = _sender.getnpop();
00407 
00408                                         if (_lastack > 0)
00409                                         {
00410                                                 // some data are already acknowledged, make a fragment?
00411                                                 //TODO: make a fragment
00412                                                 TransferAbortedEvent::raise(EID(_node.getURI()), id, dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN);
00413                                         }
00414                                         else
00415                                         {
00416                                                 // raise transfer abort event for all bundles without an ACK
00417                                                 TransferAbortedEvent::raise(EID(_node.getURI()), id, dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN);
00418                                         }
00419 
00420                                         // set last ack to zero
00421                                         _lastack = 0;
00422                                 }
00423                         } catch (ibrcommon::QueueUnblockedException) {
00424                                 // queue emtpy
00425                         }
00426                 }
00427 
00428                 void TCPConvergenceLayer::TCPConnection::keepalive()
00429                 {
00430                         _stream.keepalive();
00431                 }
00432 
00433                 void TCPConvergenceLayer::TCPConnection::Sender::finally()
00434                 {
00435                         // notify the aborted transfer of the last bundle
00436                         if (_current_transfer != dtn::data::BundleID())
00437                         {
00438                                 TransferAbortedEvent::raise(EID(_connection._node.getURI()), _current_transfer, dtn::net::TransferAbortedEvent::REASON_CONNECTION_DOWN);
00439                         }
00440                 }
00441         }
00442 }

Generated on Thu Nov 11 2010 09:49:47 for IBR-DTNSuite by  doxygen 1.7.1