IBR-DTNSuite 0.6

daemon/src/net/TCPConnection.cpp

Go to the documentation of this file.
00001 /*
00002  * TCPConnection.cpp
00003  *
00004  *  Created on: 26.04.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "Configuration.h"
00010 #include "core/BundleCore.h"
00011 #include "core/BundleEvent.h"
00012 #include "core/BundleStorage.h"
00013 
00014 #include "net/TCPConvergenceLayer.h"
00015 #include "net/BundleReceivedEvent.h"
00016 #include "net/ConnectionEvent.h"
00017 #include "net/TransferCompletedEvent.h"
00018 #include "net/TransferAbortedEvent.h"
00019 #include "routing/RequeueBundleEvent.h"
00020 
00021 #include <ibrcommon/net/tcpclient.h>
00022 #include <ibrcommon/TimeMeasurement.h>
00023 #include <ibrcommon/net/vinterface.h>
00024 #include <ibrcommon/Logger.h>
00025 
00026 #include <iostream>
00027 #include <iomanip>
00028 
00029 #ifdef WITH_BUNDLE_SECURITY
00030 #include "security/SecurityManager.h"
00031 #endif
00032 
00033 namespace dtn
00034 {
00035         namespace net
00036         {
00037                 /*
00038                  * class TCPConnection
00039                  */
00040                 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID &name, const size_t timeout)
00041                  : _peer(), _node(Node::NODE_CONNECTED), _tcpstream(stream), _stream(*this, *stream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), _sender(*this, _keepalive_timeout),
00042                    _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv)
00043                 {
00044                         _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00045 
00046                         if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00047                         {
00048                                 stream->enableNoDelay();
00049                         }
00050 
00051                         stream->enableLinger(10);
00052                         stream->enableKeepalive();
00053 
00054                         // add default TCP connection
00055                         _node.add(dtn::core::Node::URI("0.0.0.0", Node::CONN_TCPIP));
00056                 }
00057 
00058                 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, const dtn::data::EID &name, const size_t timeout)
00059                  : _peer(), _node(node), _tcpstream(new ibrcommon::tcpclient()), _stream(*this, *_tcpstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()), _sender(*this, _keepalive_timeout),
00060                    _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv)
00061                 {
00062                         _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00063                 }
00064 
00065                 TCPConnection::~TCPConnection()
00066                 {
00067                         // wait until the sender thread is finished
00068                         _sender.join();
00069                 }
00070 
00071                 void TCPConnection::queue(const dtn::data::BundleID &bundle)
00072                 {
00073                         _sender.push(bundle);
00074                 }
00075 
00076                 const StreamContactHeader& TCPConnection::getHeader() const
00077                 {
00078                         return _peer;
00079                 }
00080 
00081                 const dtn::core::Node& TCPConnection::getNode() const
00082                 {
00083                         return _node;
00084                 }
00085 
00086                 void TCPConnection::rejectTransmission()
00087                 {
00088                         _stream.reject();
00089                 }
00090 
00091                 void TCPConnection::eventShutdown(StreamConnection::ConnectionShutdownCases)
00092                 {
00093                 }
00094 
00095                 void TCPConnection::eventTimeout()
00096                 {
00097                         // event
00098                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_TIMEOUT, _node);
00099 
00100                         // stop the receiver thread
00101                         this->stop();
00102                 }
00103 
00104                 void TCPConnection::eventError()
00105                 {
00106                 }
00107 
00108                 void TCPConnection::eventConnectionUp(const StreamContactHeader &header)
00109                 {
00110                         _peer = header;
00111                         _node.setEID(header._localeid);
00112                         _keepalive_timeout = header._keepalive * 1000;
00113 
00114                         // set the incoming timer if set (> 0)
00115                         if (_peer._keepalive > 0)
00116                         {
00117                                 // set the timer
00118                                 _tcpstream->setTimeout(header._keepalive * 2);
00119                         }
00120 
00121                         // raise up event
00122                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_UP, _node);
00123                 }
00124 
00125                 void TCPConnection::eventConnectionDown()
00126                 {
00127                         IBRCOMMON_LOGGER_DEBUG(40) << "TCPConnection::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00128 
00129                         try {
00130                                 // stop the sender
00131                                 _sender.stop();
00132                         } catch (const ibrcommon::ThreadException &ex) {
00133                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00134                         }
00135 
00136                         if (_peer._localeid != dtn::data::EID())
00137                         {
00138                                 // event
00139                                 ConnectionEvent::raise(ConnectionEvent::CONNECTION_DOWN, _node);
00140                         }
00141                 }
00142 
00143                 void TCPConnection::eventBundleRefused()
00144                 {
00145                         try {
00146                                 const dtn::data::BundleID bundle = _sentqueue.getnpop();
00147 
00148                                 // requeue the bundle
00149                                 TransferAbortedEvent::raise(EID(_node.getEID()), bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED);
00150 
00151                                 // set ACK to zero
00152                                 _lastack = 0;
00153 
00154                         } catch (const ibrcommon::QueueUnblockedException&) {
00155                                 // pop on empty queue!
00156                                 IBRCOMMON_LOGGER(error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00157                         }
00158                 }
00159 
00160                 void TCPConnection::eventBundleForwarded()
00161                 {
00162                         try {
00163                                 const dtn::data::MetaBundle bundle = _sentqueue.getnpop();
00164 
00165                                 // signal completion of the transfer
00166                                 TransferCompletedEvent::raise(_node.getEID(), bundle);
00167 
00168                                 // raise bundle event
00169                                 dtn::core::BundleEvent::raise(bundle, BUNDLE_FORWARDED);
00170 
00171                                 // set ACK to zero
00172                                 _lastack = 0;
00173                         } catch (const ibrcommon::QueueUnblockedException&) {
00174                                 // pop on empty queue!
00175                                 IBRCOMMON_LOGGER(error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00176                         }
00177                 }
00178 
00179                 void TCPConnection::eventBundleAck(size_t ack)
00180                 {
00181                         _lastack = ack;
00182                 }
00183 
00184                 void TCPConnection::initialize()
00185                 {
00186                         // start the receiver for incoming bundles + handshake
00187                         try {
00188                                 start();
00189                         } catch (const ibrcommon::ThreadException &ex) {
00190                                 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00191                         }
00192                 }
00193 
00194                 void TCPConnection::shutdown()
00195                 {
00196                         // shutdown
00197                         _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00198 
00199                         try {
00200                                 // abort the connection thread
00201                                 ibrcommon::DetachedThread::stop();
00202                         } catch (const ibrcommon::ThreadException &ex) {
00203                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00204                         }
00205                 }
00206 
00207                 bool TCPConnection::__cancellation()
00208                 {
00209                         // close the stream
00210                         try {
00211                                 (*_tcpstream).close();
00212                         } catch (const ibrcommon::ConnectionClosedException&) { };
00213 
00214                         return true;
00215                 }
00216 
00217                 void TCPConnection::finally()
00218                 {
00219                         IBRCOMMON_LOGGER_DEBUG(60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
00220 
00221                         try {
00222                                 // shutdown the sender thread
00223                                 _sender.stop();
00224                         } catch (const std::exception&) { };
00225 
00226                         // close the tcpstream
00227                         try {
00228                                 _tcpstream->close();
00229                         } catch (const ibrcommon::ConnectionClosedException&) { };
00230 
00231                         try {
00232                                 _callback.connectionDown(this);
00233                         } catch (const ibrcommon::MutexException&) { };
00234 
00235                         // clear the queue
00236                         clearQueue();
00237                 }
00238 
00239                 void TCPConnection::setup()
00240                 {
00241                         // variables for address and port
00242                         std::string address = "0.0.0.0";
00243                         unsigned int port = 0;
00244 
00245                         // try to connect to the other side
00246                         try {
00247                                 const std::list<dtn::core::Node::URI> uri_list = _node.get(dtn::core::Node::CONN_TCPIP);
00248                                 if (uri_list.empty()) throw ibrcommon::tcpclient::SocketException("no address available to connect");
00249 
00250                                 // decode address and port
00251                                 const dtn::core::Node::URI &uri = uri_list.front();
00252                                 uri.decode(address, port);
00253 
00254                                 ibrcommon::tcpclient &client = dynamic_cast<ibrcommon::tcpclient&>(*_tcpstream);
00255                                 client.open(address, port, _timeout);
00256 
00257                                 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00258                                 {
00259                                         _tcpstream->enableNoDelay();
00260                                 }
00261 
00262                                 _tcpstream->enableLinger(10);
00263                                 _tcpstream->enableKeepalive();
00264                         } catch (const ibrcommon::tcpclient::SocketException&) {
00265                                 // error on open, requeue all bundles in the queue
00266                                 IBRCOMMON_LOGGER(warning) << "connection to " << _node.toString() << " (" << address << ":" << port << ") failed" << IBRCOMMON_LOGGER_ENDL;
00267                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00268                                 throw;
00269                         } catch (const bad_cast&) { };
00270                 }
00271 
00272                 void TCPConnection::run()
00273                 {
00274                         try {
00275                                 // do the handshake
00276                                 char flags = 0;
00277 
00278                                 // enable ACKs and NACKs
00279                                 flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00280                                 flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS;
00281 
00282                                 // do the handshake
00283                                 _stream.handshake(_name, _timeout, flags);
00284 
00285                                 // start the sender
00286                                 _sender.start();
00287 
00288                                 while (!_stream.eof())
00289                                 {
00290                                         try {
00291                                                 // create a new empty bundle
00292                                                 dtn::data::Bundle bundle;
00293 
00294                                                 // deserialize the bundle
00295                                                 (*this) >> bundle;
00296 
00297                                                 // check the bundle
00298                                                 if ( ( bundle._destination == EID() ) || ( bundle._source == EID() ) )
00299                                                 {
00300                                                         // invalid bundle!
00301                                                         throw dtn::data::Validator::RejectedException("destination or source EID is null");
00302                                                 }
00303 
00304                                                 // raise default bundle received event
00305                                                 dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle);
00306                                         }
00307                                         catch (const dtn::data::Validator::RejectedException &ex)
00308                                         {
00309                                                 // bundle rejected
00310                                                 rejectTransmission();
00311 
00312                                                 // display the rejection
00313                                                 IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00314                                         }
00315                                         catch (const dtn::InvalidDataException &ex) {
00316                                                 // bundle rejected
00317                                                 rejectTransmission();
00318 
00319                                                 // display the rejection
00320                                                 IBRCOMMON_LOGGER(warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00321                                         }
00322 
00323                                         yield();
00324                                 }
00325                         } catch (const ibrcommon::ThreadException &ex) {
00326                                 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00327                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00328                         } catch (const std::exception &ex) {
00329                                 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00330                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00331                         }
00332                 }
00333 
00334 
00335                 TCPConnection& operator>>(TCPConnection &conn, dtn::data::Bundle &bundle)
00336                 {
00337                         std::iostream &stream = conn._stream;
00338 
00339                         // check if the stream is still good
00340                         if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00341 
00342                         dtn::data::DefaultDeserializer(stream, dtn::core::BundleCore::getInstance()) >> bundle;
00343                         return conn;
00344                 }
00345 
00346                 TCPConnection& operator<<(TCPConnection &conn, const dtn::data::Bundle &bundle)
00347                 {
00348                         // prepare a measurement
00349                         ibrcommon::TimeMeasurement m;
00350 
00351                         std::iostream &stream = conn._stream;
00352 
00353                         // create a serializer
00354                         dtn::data::DefaultSerializer serializer(stream);
00355 
00356                         // put the bundle into the sentqueue
00357                         conn._sentqueue.push(bundle);
00358 
00359                         // start the measurement
00360                         m.start();
00361 
00362                         try {
00363                                 // activate exceptions for this method
00364                                 if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00365 
00366                                 // transmit the bundle
00367                                 serializer << bundle;
00368 
00369                                 // flush the stream
00370                                 stream << std::flush;
00371 
00372                                 // stop the time measurement
00373                                 m.stop();
00374 
00375                                 // get throughput
00376                                 double kbytes_per_second = (serializer.getLength(bundle) / m.getSeconds()) / 1024;
00377 
00378                                 // print out throughput
00379                                 IBRCOMMON_LOGGER_DEBUG(5) << "transfer finished after " << m << " with "
00380                                                 << std::setiosflags(std::ios::fixed) << std::setprecision(2) << kbytes_per_second << " kb/s" << IBRCOMMON_LOGGER_ENDL;
00381 
00382                         } catch (const ibrcommon::Exception &ex) {
00383                                 // the connection not available
00384                                 IBRCOMMON_LOGGER_DEBUG(10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00385 
00386                                 // forward exception
00387                                 throw;
00388                         }
00389 
00390                         return conn;
00391                 }
00392 
00393                 TCPConnection::Sender::Sender(TCPConnection &connection, size_t &keepalive_timeout)
00394                  : _connection(connection), _keepalive_timeout(keepalive_timeout)
00395                 {
00396                 }
00397 
00398                 TCPConnection::Sender::~Sender()
00399                 {
00400                 }
00401 
00402                 bool TCPConnection::Sender::__cancellation()
00403                 {
00404                         // cancel the main thread in here
00405                         ibrcommon::Queue<dtn::data::BundleID>::abort();
00406 
00407                         return true;
00408                 }
00409 
00410                 void TCPConnection::Sender::run()
00411                 {
00412                         try {
00413                                 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00414 
00415                                 while (_connection.good())
00416                                 {
00417                                         try {
00418                                                 _current_transfer = ibrcommon::Queue<dtn::data::BundleID>::getnpop(true, _keepalive_timeout);
00419 
00420                                                 try {
00421                                                         // read the bundle out of the storage
00422                                                         dtn::data::Bundle bundle = storage.get(_current_transfer);
00423 
00424 #ifdef WITH_BUNDLE_SECURITY
00425                                                         const dtn::daemon::Configuration::Security::Level seclevel =
00426                                                                         dtn::daemon::Configuration::getInstance().getSecurity().getLevel();
00427 
00428                                                         if (seclevel & dtn::daemon::Configuration::Security::SECURITY_LEVEL_AUTHENTICATED)
00429                                                         {
00430                                                                 try {
00431                                                                         dtn::security::SecurityManager::getInstance().auth(bundle);
00432                                                                 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00433                                                                         // sign requested, but no key is available
00434                                                                         IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
00435                                                                 }
00436                                                         }
00437 #endif
00438                                                         // send bundle
00439                                                         _connection << bundle;
00440                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00441                                                         // send transfer aborted event
00442                                                         TransferAbortedEvent::raise(_connection._node.getEID(), _current_transfer, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00443                                                 }
00444                                                 
00445                                                 // unset the current transfer
00446                                                 _current_transfer = dtn::data::BundleID();
00447                                                 
00448                                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00449                                                 switch (ex.reason)
00450                                                 {
00451                                                         case ibrcommon::QueueUnblockedException::QUEUE_ERROR:
00452                                                         case ibrcommon::QueueUnblockedException::QUEUE_ABORT:
00453                                                                 throw;
00454                                                         case ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT:
00455                                                         {
00456                                                                 // send a keepalive
00457                                                                 _connection.keepalive();
00458                                                         }
00459                                                 }
00460                                         }
00461 
00462                                         // idle a little bit
00463                                         yield();
00464                                 }
00465                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00466                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00467                                 return;
00468                         } catch (const std::exception &ex) {
00469                                 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00470                         }
00471 
00472                         _connection.stop();
00473                 }
00474 
00475                 void TCPConnection::clearQueue()
00476                 {
00477                         // requeue all bundles still queued
00478                         try {
00479                                 while (true)
00480                                 {
00481                                         const dtn::data::BundleID id = _sender.getnpop();
00482 
00483                                         // raise transfer abort event for all bundles without an ACK
00484                                         dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id);
00485                                 }
00486                         } catch (const ibrcommon::QueueUnblockedException&) {
00487                                 // queue emtpy
00488                         }
00489 
00490                         // requeue all bundles still in transit
00491                         try {
00492                                 while (true)
00493                                 {
00494                                         const dtn::data::BundleID id = _sentqueue.getnpop();
00495 
00496                                         if (_lastack > 0)
00497                                         {
00498                                                 // some data are already acknowledged, make a fragment?
00499                                                 //TODO: make a fragment
00500                                                 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id);
00501                                         }
00502                                         else
00503                                         {
00504                                                 // raise transfer abort event for all bundles without an ACK
00505                                                 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id);
00506                                         }
00507 
00508                                         // set last ack to zero
00509                                         _lastack = 0;
00510                                 }
00511                         } catch (const ibrcommon::QueueUnblockedException&) {
00512                                 // queue emtpy
00513                         }
00514                 }
00515 
00516                 void TCPConnection::keepalive()
00517                 {
00518                         _stream.keepalive();
00519                 }
00520 
00521                 bool TCPConnection::good() const
00522                 {
00523                         return _tcpstream->good();
00524                 }
00525 
00526                 void TCPConnection::Sender::finally()
00527                 {
00528                         // notify the aborted transfer of the last bundle
00529                         if (_current_transfer != dtn::data::BundleID())
00530                         {
00531                                 dtn::routing::RequeueBundleEvent::raise(_connection._node.getEID(), _current_transfer);
00532                         }
00533                 }
00534 
00535                 bool TCPConnection::match(const dtn::core::Node &n) const
00536                 {
00537                         return (_node == n);
00538                 }
00539 
00540                 bool TCPConnection::match(const dtn::data::EID &destination) const
00541                 {
00542                         return (_node.getEID() == destination.getNode());
00543                 }
00544 
00545                 bool TCPConnection::match(const NodeEvent &evt) const
00546                 {
00547                         const dtn::core::Node &n = evt.getNode();
00548                         return match(n);
00549                 }
00550         }
00551 }