IBR-DTNSuite 0.6

daemon/src/net/TCPConnection.cpp

Go to the documentation of this file.
00001 /*
00002  * TCPConnection.cpp
00003  *
00004  *  Created on: 26.04.2010
00005  *      Author: morgenro
00006  */
00007 
00008 #include "config.h"
00009 #include "Configuration.h"
00010 #include "core/BundleCore.h"
00011 #include "core/BundleEvent.h"
00012 #include "core/BundleStorage.h"
00013 
00014 #include "net/TCPConvergenceLayer.h"
00015 #include "net/BundleReceivedEvent.h"
00016 #include "net/ConnectionEvent.h"
00017 #include "net/TransferCompletedEvent.h"
00018 #include "net/TransferAbortedEvent.h"
00019 #include "routing/RequeueBundleEvent.h"
00020 
00021 #include <ibrdtn/data/ScopeControlHopLimitBlock.h>
00022 #include <ibrcommon/net/tcpclient.h>
00023 #include <ibrcommon/TimeMeasurement.h>
00024 #include <ibrcommon/net/vinterface.h>
00025 #include <ibrcommon/Logger.h>
00026 
00027 #include <iostream>
00028 #include <iomanip>
00029 
00030 #ifdef WITH_TLS
00031 #include <openssl/x509.h>
00032 #include "security/SecurityCertificateManager.h"
00033 #include <ibrcommon/TLSExceptions.h>
00034 #endif
00035 
00036 #ifdef WITH_BUNDLE_SECURITY
00037 #include "security/SecurityManager.h"
00038 #endif
00039 
00040 namespace dtn
00041 {
00042         namespace net
00043         {
00044                 /*
00045                  * class TCPConnection
00046                  */
00047                 TCPConnection::TCPConnection(TCPConvergenceLayer & tcpsrv, ibrcommon::tcpstream *stream, const dtn::data::EID & name, const size_t timeout)
00048 #ifdef WITH_TLS
00049                  : _peer(), _node(name), _tcpstream(stream), _tlsstream(stream), _stream(*this, _tlsstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()),
00050                    _sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0)
00051 #else
00052                  : _peer(), _node(name), _tcpstream(stream), _stream(*this, *_tcpstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()),
00053                    _sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0)
00054 #endif
00055                 {
00056                         _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00057 
00058                         if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00059                         {
00060                                 stream->enableNoDelay();
00061                         }
00062 
00063                         // add default TCP connection
00064                         _node.clear();
00065                         _node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, "0.0.0.0") );
00066 
00067                         _flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00068                         _flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS;
00069 
00070 #ifdef WITH_TLS
00071                         // set tls mode to server
00072                         _tlsstream.setServer(true);
00073 #endif
00074                 }
00075 
00076                 TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, const dtn::data::EID &name, const size_t timeout)
00077 #ifdef WITH_TLS
00078                  : _peer(), _node(node), _tcpstream(new ibrcommon::tcpclient()), _tlsstream(_tcpstream.get()), _stream(*this, _tlsstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()),
00079                    _sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0)
00080 #else
00081                  : _peer(), _node(node), _tcpstream(new ibrcommon::tcpclient()), _stream(*this, *_tcpstream, dtn::daemon::Configuration::getInstance().getNetwork().getTCPChunkSize()),
00082                    _sender(*this, _keepalive_timeout), _name(name), _timeout(timeout), _lastack(0), _keepalive_timeout(0), _callback(tcpsrv), _flags(0)
00083 #endif
00084                 {
00085                         _stream.exceptions(std::ios::badbit | std::ios::eofbit);
00086 
00087                         _flags |= dtn::streams::StreamContactHeader::REQUEST_ACKNOWLEDGMENTS;
00088                         _flags |= dtn::streams::StreamContactHeader::REQUEST_NEGATIVE_ACKNOWLEDGMENTS;
00089                 }
00090 
00091                 TCPConnection::~TCPConnection()
00092                 {
00093                         // wait until the sender thread is finished
00094                         _sender.join();
00095                 }
00096 
00097                 void TCPConnection::queue(const dtn::data::BundleID &bundle)
00098                 {
00099                         _sender.push(bundle);
00100                 }
00101 
00102                 const StreamContactHeader& TCPConnection::getHeader() const
00103                 {
00104                         return _peer;
00105                 }
00106 
00107                 const dtn::core::Node& TCPConnection::getNode() const
00108                 {
00109                         return _node;
00110                 }
00111 
00112                 void TCPConnection::rejectTransmission()
00113                 {
00114                         _stream.reject();
00115                 }
00116 
00117                 void TCPConnection::eventShutdown(StreamConnection::ConnectionShutdownCases)
00118                 {
00119                 }
00120 
00121                 void TCPConnection::eventTimeout()
00122                 {
00123                         // event
00124                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_TIMEOUT, _node);
00125 
00126                         // stop the receiver thread
00127                         this->stop();
00128                 }
00129 
00130                 void TCPConnection::eventError()
00131                 {
00132                 }
00133 
00134                 void TCPConnection::eventConnectionUp(const StreamContactHeader &header)
00135                 {
00136                         _peer = header;
00137 
00138                         // copy old attributes and urls to the new node object
00139                         Node n_old = _node;
00140                         _node = Node(header._localeid);
00141                         _node += n_old;
00142 
00143                         _keepalive_timeout = header._keepalive * 1000;
00144 
00145 #ifdef WITH_TLS
00146                         /* if both nodes support TLS, activate it */
00147                         if((_peer._flags & dtn::streams::StreamContactHeader::REQUEST_TLS)
00148                                         && (_flags & dtn::streams::StreamContactHeader::REQUEST_TLS)){
00149                                 try{
00150                                         X509 *peer_cert = _tlsstream.activate();
00151                                         if(!dtn::security::SecurityCertificateManager::validateSubject(peer_cert, _peer.getEID())){
00152                                                 IBRCOMMON_LOGGER(warning) << "TCPConnection: certificate does not fit the EID." << IBRCOMMON_LOGGER_ENDL;
00153                                                 throw ibrcommon::TLSCertificateVerificationException("certificate does not fit the EID");
00154                                         }
00155                                 } catch(...){
00156                                         if(dtn::daemon::Configuration::getInstance().getSecurity().TLSRequired()){
00157                                                 /* close the connection */
00158                                                 IBRCOMMON_LOGGER(warning) << "TCPConnection: TLS failed, closing the connection." << IBRCOMMON_LOGGER_ENDL;
00159                                                 throw;
00160                                         } else {
00161                                                 IBRCOMMON_LOGGER(warning) << "TCPConnection: TLS failed, continuing unauthenticated." << IBRCOMMON_LOGGER_ENDL;
00162                                         }
00163                                 }
00164                         } else {
00165                                 /* TLS not supported by both Nodes, check if its required */
00166                                 if(dtn::daemon::Configuration::getInstance().getSecurity().TLSRequired()){
00167                                         /* close the connection */
00168                                         IBRCOMMON_LOGGER(warning) << "TCPConnection: TLS not supported by both Peers. Closing the connection." << IBRCOMMON_LOGGER_ENDL;
00169                                         throw ibrcommon::TLSException("TLS not supported by peer.");
00170                                 } else if(_flags & dtn::streams::StreamContactHeader::REQUEST_TLS){
00171                                         IBRCOMMON_LOGGER(warning) << "TCPConnection: TLS not supported by peer. Continuing without TLS." << IBRCOMMON_LOGGER_ENDL;
00172                                 }
00173                                 /* else: this node does not support TLS, should have already printed a warning */
00174                         }
00175 #endif
00176 
00177                         // set the incoming timer if set (> 0)
00178                         if (_peer._keepalive > 0)
00179                         {
00180                                 // set the timer
00181                                 _tcpstream->setTimeout(header._keepalive * 2);
00182                         }
00183 
00184                         // raise up event
00185                         ConnectionEvent::raise(ConnectionEvent::CONNECTION_UP, _node);
00186                 }
00187 
00188                 void TCPConnection::eventConnectionDown()
00189                 {
00190                         IBRCOMMON_LOGGER_DEBUG(40) << "TCPConnection::eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
00191 
00192                         try {
00193                                 // stop the sender
00194                                 _sender.stop();
00195                         } catch (const ibrcommon::ThreadException &ex) {
00196                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::eventConnectionDown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00197                         }
00198 
00199                         if (_peer._localeid != dtn::data::EID())
00200                         {
00201                                 // event
00202                                 ConnectionEvent::raise(ConnectionEvent::CONNECTION_DOWN, _node);
00203                         }
00204                 }
00205 
00206                 void TCPConnection::eventBundleRefused()
00207                 {
00208                         try {
00209                                 const dtn::data::BundleID bundle = _sentqueue.getnpop();
00210 
00211                                 // requeue the bundle
00212                                 TransferAbortedEvent::raise(EID(_node.getEID()), bundle, dtn::net::TransferAbortedEvent::REASON_REFUSED);
00213 
00214                                 // set ACK to zero
00215                                 _lastack = 0;
00216 
00217                         } catch (const ibrcommon::QueueUnblockedException&) {
00218                                 // pop on empty queue!
00219                                 IBRCOMMON_LOGGER(error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00220                         }
00221                 }
00222 
00223                 void TCPConnection::eventBundleForwarded()
00224                 {
00225                         try {
00226                                 const dtn::data::MetaBundle bundle = _sentqueue.getnpop();
00227 
00228                                 // signal completion of the transfer
00229                                 TransferCompletedEvent::raise(_node.getEID(), bundle);
00230 
00231                                 // raise bundle event
00232                                 dtn::core::BundleEvent::raise(bundle, BUNDLE_FORWARDED);
00233 
00234                                 // set ACK to zero
00235                                 _lastack = 0;
00236                         } catch (const ibrcommon::QueueUnblockedException&) {
00237                                 // pop on empty queue!
00238                                 IBRCOMMON_LOGGER(error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
00239                         }
00240                 }
00241 
00242                 void TCPConnection::eventBundleAck(size_t ack)
00243                 {
00244                         _lastack = ack;
00245                 }
00246 
00247                 void TCPConnection::initialize()
00248                 {
00249                         // start the receiver for incoming bundles + handshake
00250                         try {
00251                                 start();
00252                         } catch (const ibrcommon::ThreadException &ex) {
00253                                 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00254                         }
00255                 }
00256 
00257                 void TCPConnection::shutdown()
00258                 {
00259                         // shutdown
00260                         _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00261 
00262                         try {
00263                                 // abort the connection thread
00264                                 ibrcommon::DetachedThread::stop();
00265                         } catch (const ibrcommon::ThreadException &ex) {
00266                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::shutdown(): ThreadException (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00267                         }
00268                 }
00269 
00270                 bool TCPConnection::__cancellation()
00271                 {
00272                         // close the stream
00273                         try {
00274                                 (*_tcpstream).close();
00275                         } catch (const ibrcommon::ConnectionClosedException&) { };
00276 
00277                         return true;
00278                 }
00279 
00280                 void TCPConnection::finally()
00281                 {
00282                         IBRCOMMON_LOGGER_DEBUG(60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
00283 
00284                         try {
00285                                 // shutdown the sender thread
00286                                 _sender.stop();
00287                         } catch (const std::exception&) { };
00288 
00289                         // close the tcpstream
00290                         try {
00291                                 _tcpstream->close();
00292                         } catch (const ibrcommon::ConnectionClosedException&) { };
00293 
00294                         try {
00295                                 _callback.connectionDown(this);
00296                         } catch (const ibrcommon::MutexException&) { };
00297 
00298                         // clear the queue
00299                         clearQueue();
00300                 }
00301 
00302                 void TCPConnection::setup()
00303                 {
00304                         // variables for address and port
00305                         std::string address = "0.0.0.0";
00306                         unsigned int port = 0;
00307 
00308                         // try to connect to the other side
00309                         try {
00310                                 const std::list<dtn::core::Node::URI> uri_list = _node.get(dtn::core::Node::CONN_TCPIP);
00311 
00312                                 for (std::list<dtn::core::Node::URI>::const_iterator iter = uri_list.begin(); iter != uri_list.end(); iter++)
00313                                 {
00314                                         try {
00315                                                 // decode address and port
00316                                                 const dtn::core::Node::URI &uri = uri_list.front();
00317                                                 uri.decode(address, port);
00318 
00319                                                 ibrcommon::tcpclient &client = dynamic_cast<ibrcommon::tcpclient&>(*_tcpstream);
00320                                                 client.open(address, port, _timeout);
00321 
00322                                                 if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
00323                                                 {
00324                                                         _tcpstream->enableNoDelay();
00325                                                 }
00326 
00327                                                 // add TCP connection descriptor to the node object
00328                                                 _node.clear();
00329                                                 _node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, uri.value) );
00330 
00331                                                 // connection successful
00332                                                 return;
00333                                         } catch (const ibrcommon::tcpclient::SocketException&) { };
00334                                 }
00335 
00336                                 // no connection has been established
00337                                 throw ibrcommon::tcpclient::SocketException("no address available to connect");
00338 
00339                         } catch (const ibrcommon::tcpclient::SocketException&) {
00340                                 // error on open, requeue all bundles in the queue
00341                                 IBRCOMMON_LOGGER(warning) << "connection to " << _node.toString() << " failed" << IBRCOMMON_LOGGER_ENDL;
00342                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00343                                 throw;
00344                         } catch (const bad_cast&) { };
00345                 }
00346 
00347                 void TCPConnection::run()
00348                 {
00349                         try {
00350                                 // do the handshake
00351                                 _stream.handshake(_name, _timeout, _flags);
00352 
00353                                 // start the sender
00354                                 _sender.start();
00355 
00356                                 while (!_stream.eof())
00357                                 {
00358                                         try {
00359                                                 // create a new empty bundle
00360                                                 dtn::data::Bundle bundle;
00361 
00362                                                 // deserialize the bundle
00363                                                 (*this) >> bundle;
00364 
00365                                                 // check the bundle
00366                                                 if ( ( bundle._destination == EID() ) || ( bundle._source == EID() ) )
00367                                                 {
00368                                                         // invalid bundle!
00369                                                         throw dtn::data::Validator::RejectedException("destination or source EID is null");
00370                                                 }
00371 
00372                                                 // increment value in the scope control hop limit block
00373                                                 try {
00374                                                         dtn::data::ScopeControlHopLimitBlock &schl = bundle.getBlock<dtn::data::ScopeControlHopLimitBlock>();
00375                                                         schl.increment();
00376                                                 } catch (const dtn::data::Bundle::NoSuchBlockFoundException&) { }
00377 
00378                                                 // raise default bundle received event
00379                                                 dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle);
00380                                         }
00381                                         catch (const dtn::data::Validator::RejectedException &ex)
00382                                         {
00383                                                 // bundle rejected
00384                                                 rejectTransmission();
00385 
00386                                                 // display the rejection
00387                                                 IBRCOMMON_LOGGER(warning) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00388                                         }
00389                                         catch (const dtn::InvalidDataException &ex) {
00390                                                 // bundle rejected
00391                                                 rejectTransmission();
00392 
00393                                                 // display the rejection
00394                                                 IBRCOMMON_LOGGER(warning) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00395                                         }
00396 
00397                                         yield();
00398                                 }
00399                         } catch (const ibrcommon::ThreadException &ex) {
00400                                 IBRCOMMON_LOGGER(error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
00401                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00402                         } catch (const std::exception &ex) {
00403                                 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
00404                                 _stream.shutdown(StreamConnection::CONNECTION_SHUTDOWN_ERROR);
00405                         }
00406                 }
00407 
00408 
00409                 TCPConnection& operator>>(TCPConnection &conn, dtn::data::Bundle &bundle)
00410                 {
00411                         std::iostream &stream = conn._stream;
00412 
00413                         // check if the stream is still good
00414                         if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00415 
00416                         dtn::data::DefaultDeserializer(stream, dtn::core::BundleCore::getInstance()) >> bundle;
00417                         return conn;
00418                 }
00419 
00420                 TCPConnection& operator<<(TCPConnection &conn, const dtn::data::Bundle &bundle)
00421                 {
00422                         // prepare a measurement
00423                         ibrcommon::TimeMeasurement m;
00424 
00425                         std::iostream &stream = conn._stream;
00426 
00427                         // create a serializer
00428                         dtn::data::DefaultSerializer serializer(stream);
00429 
00430                         // put the bundle into the sentqueue
00431                         conn._sentqueue.push(bundle);
00432 
00433                         // start the measurement
00434                         m.start();
00435 
00436                         try {
00437                                 // activate exceptions for this method
00438                                 if (!stream.good()) throw ibrcommon::IOException("stream went bad");
00439 
00440                                 // transmit the bundle
00441                                 serializer << bundle;
00442 
00443                                 // flush the stream
00444                                 stream << std::flush;
00445 
00446                                 // stop the time measurement
00447                                 m.stop();
00448 
00449                                 // get throughput
00450                                 double kbytes_per_second = (serializer.getLength(bundle) / m.getSeconds()) / 1024;
00451 
00452                                 // print out throughput
00453                                 IBRCOMMON_LOGGER_DEBUG(5) << "transfer finished after " << m << " with "
00454                                                 << std::setiosflags(std::ios::fixed) << std::setprecision(2) << kbytes_per_second << " kb/s" << IBRCOMMON_LOGGER_ENDL;
00455 
00456                         } catch (const ibrcommon::Exception &ex) {
00457                                 // the connection not available
00458                                 IBRCOMMON_LOGGER_DEBUG(10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00459 
00460                                 // forward exception
00461                                 throw;
00462                         }
00463 
00464                         return conn;
00465                 }
00466 
00467                 TCPConnection::Sender::Sender(TCPConnection &connection, size_t &keepalive_timeout)
00468                  : _connection(connection), _keepalive_timeout(keepalive_timeout)
00469                 {
00470                 }
00471 
00472                 TCPConnection::Sender::~Sender()
00473                 {
00474                 }
00475 
00476                 bool TCPConnection::Sender::__cancellation()
00477                 {
00478                         // cancel the main thread in here
00479                         ibrcommon::Queue<dtn::data::BundleID>::abort();
00480 
00481                         return true;
00482                 }
00483 
00484                 void TCPConnection::Sender::run()
00485                 {
00486                         try {
00487                                 dtn::core::BundleStorage &storage = dtn::core::BundleCore::getInstance().getStorage();
00488 
00489                                 while (_connection.good())
00490                                 {
00491                                         try {
00492                                                 _current_transfer = ibrcommon::Queue<dtn::data::BundleID>::getnpop(true, _keepalive_timeout);
00493 
00494                                                 try {
00495                                                         // read the bundle out of the storage
00496                                                         dtn::data::Bundle bundle = storage.get(_current_transfer);
00497 
00498 #ifdef WITH_BUNDLE_SECURITY
00499                                                         const dtn::daemon::Configuration::Security::Level seclevel =
00500                                                                         dtn::daemon::Configuration::getInstance().getSecurity().getLevel();
00501 
00502                                                         if (seclevel & dtn::daemon::Configuration::Security::SECURITY_LEVEL_AUTHENTICATED)
00503                                                         {
00504                                                                 try {
00505                                                                         dtn::security::SecurityManager::getInstance().auth(bundle);
00506                                                                 } catch (const dtn::security::SecurityManager::KeyMissingException&) {
00507                                                                         // sign requested, but no key is available
00508                                                                         IBRCOMMON_LOGGER(warning) << "No key available for sign process." << IBRCOMMON_LOGGER_ENDL;
00509                                                                 }
00510                                                         }
00511 #endif
00512                                                         // send bundle
00513                                                         _connection << bundle;
00514                                                 } catch (const dtn::core::BundleStorage::NoBundleFoundException&) {
00515                                                         // send transfer aborted event
00516                                                         TransferAbortedEvent::raise(_connection._node.getEID(), _current_transfer, dtn::net::TransferAbortedEvent::REASON_BUNDLE_DELETED);
00517                                                 }
00518                                                 
00519                                                 // unset the current transfer
00520                                                 _current_transfer = dtn::data::BundleID();
00521                                                 
00522                                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00523                                                 switch (ex.reason)
00524                                                 {
00525                                                         case ibrcommon::QueueUnblockedException::QUEUE_ERROR:
00526                                                         case ibrcommon::QueueUnblockedException::QUEUE_ABORT:
00527                                                                 throw;
00528                                                         case ibrcommon::QueueUnblockedException::QUEUE_TIMEOUT:
00529                                                         {
00530                                                                 // send a keepalive
00531                                                                 _connection.keepalive();
00532                                                         }
00533                                                 }
00534                                         }
00535 
00536                                         // idle a little bit
00537                                         yield();
00538                                 }
00539                         } catch (const ibrcommon::QueueUnblockedException &ex) {
00540                                 IBRCOMMON_LOGGER_DEBUG(50) << "TCPConnection::Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
00541                                 return;
00542                         } catch (const std::exception &ex) {
00543                                 IBRCOMMON_LOGGER_DEBUG(10) << "TCPConnection::Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
00544                         }
00545 
00546                         _connection.stop();
00547                 }
00548 
00549                 void TCPConnection::clearQueue()
00550                 {
00551                         // requeue all bundles still queued
00552                         try {
00553                                 while (true)
00554                                 {
00555                                         const dtn::data::BundleID id = _sender.getnpop();
00556 
00557                                         // raise transfer abort event for all bundles without an ACK
00558                                         dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id);
00559                                 }
00560                         } catch (const ibrcommon::QueueUnblockedException&) {
00561                                 // queue emtpy
00562                         }
00563 
00564                         // requeue all bundles still in transit
00565                         try {
00566                                 while (true)
00567                                 {
00568                                         const dtn::data::BundleID id = _sentqueue.getnpop();
00569 
00570                                         if (_lastack > 0)
00571                                         {
00572                                                 // some data are already acknowledged, make a fragment?
00573                                                 //TODO: make a fragment
00574                                                 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id);
00575                                         }
00576                                         else
00577                                         {
00578                                                 // raise transfer abort event for all bundles without an ACK
00579                                                 dtn::routing::RequeueBundleEvent::raise(_node.getEID(), id);
00580                                         }
00581 
00582                                         // set last ack to zero
00583                                         _lastack = 0;
00584                                 }
00585                         } catch (const ibrcommon::QueueUnblockedException&) {
00586                                 // queue emtpy
00587                         }
00588                 }
00589                 
00590 #ifdef WITH_TLS
00591                 void dtn::net::TCPConnection::enableTLS()
00592                 {
00593                         _flags |= dtn::streams::StreamContactHeader::REQUEST_TLS;
00594                 }
00595 #endif
00596 
00597                 void TCPConnection::keepalive()
00598                 {
00599                         _stream.keepalive();
00600                 }
00601 
00602                 bool TCPConnection::good() const
00603                 {
00604                         return _tcpstream->good();
00605                 }
00606 
00607                 void TCPConnection::Sender::finally()
00608                 {
00609                         // notify the aborted transfer of the last bundle
00610                         if (_current_transfer != dtn::data::BundleID())
00611                         {
00612                                 dtn::routing::RequeueBundleEvent::raise(_connection._node.getEID(), _current_transfer);
00613                         }
00614                 }
00615 
00616                 bool TCPConnection::match(const dtn::core::Node &n) const
00617                 {
00618                         return (_node == n);
00619                 }
00620 
00621                 bool TCPConnection::match(const dtn::data::EID &destination) const
00622                 {
00623                         return (_node.getEID() == destination.getNode());
00624                 }
00625 
00626                 bool TCPConnection::match(const NodeEvent &evt) const
00627                 {
00628                         const dtn::core::Node &n = evt.getNode();
00629                         return match(n);
00630                 }
00631         }
00632 }