34 #include <ibrcommon/net/socket.h>
35 #include <ibrcommon/TimeMeasurement.h>
36 #include <ibrcommon/net/vinterface.h>
37 #include <ibrcommon/thread/Conditional.h>
38 #include <ibrcommon/thread/RWLock.h>
39 #include <ibrcommon/Logger.h>
47 #include <openssl/x509.h>
48 #include <ibrcommon/TLSExceptions.h>
49 #include <ibrcommon/ssl/TLSStream.h>
56 const std::string TCPConnection::TAG =
"TCPConnection";
62 : _peer(), _node(node), _socket(sock), _socket_stream(NULL), _sec_stream(NULL), _protocol_stream(NULL), _sender(*this),
63 _keepalive_sender(*this, _keepalive_timeout), _timeout(timeout), _lastack(0), _resume_offset(0), _keepalive_timeout(0),
64 _callback(tcpsrv), _flags(0), _aborted(false)
71 _keepalive_sender.join();
78 ibrcommon::RWLock l(_protocol_stream_mutex);
79 delete _protocol_stream;
80 _protocol_stream = NULL;
83 if (_sec_stream != NULL) {
87 if ((_socket != NULL) && (_socket_stream == NULL)) {
89 }
else if (_socket_stream != NULL) {
90 delete _socket_stream;
111 (*getProtocolStream()).reject();
126 }
catch (
const ibrcommon::ThreadException &ex) {
127 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
143 ibrcommon::TLSStream &tls =
dynamic_cast<ibrcommon::TLSStream&
>(*_sec_stream);
144 X509 *peer_cert = tls.activate();
157 if (weak_cn.find_first_of(
"//") == 0) {
158 weak_cn = weak_cn.substr(2, weak_cn.length() - 2);
168 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
169 throw ibrcommon::TLSCertificateVerificationException(ex.what());
171 }
catch (
const std::exception&) {
174 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 20) <<
"TLS failed, closing the connection." << IBRCOMMON_LOGGER_ENDL;
177 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 20) <<
"TLS failed, continuing unauthenticated." << IBRCOMMON_LOGGER_ENDL;
184 throw ibrcommon::TLSException(
"TLS not supported by peer.");
186 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, notice) <<
"TLS not supported by peer. Continuing without TLS." << IBRCOMMON_LOGGER_ENDL;
199 _node =
Node(header._localeid);
208 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) <<
"connection to local endpoint rejected" << IBRCOMMON_LOGGER_ENDL;
212 _keepalive_timeout = header._keepalive * 1000;
216 initiateExtendedHandshake();
217 }
catch (
const ibrcommon::Exception &ex) {
218 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
227 timerclear(&timeout);
230 if (_peer._keepalive > 0)
232 timeout.tv_sec = header._keepalive * 2;
236 _socket_stream->setTimeout(timeout);
241 if (_idle_timeout > 0)
243 (*getProtocolStream()).enableIdleTimeout(_idle_timeout);
245 }
catch (
const ibrcommon::Exception&) {};
253 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 40) <<
"eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
257 _keepalive_sender.stop();
261 }
catch (
const ibrcommon::ThreadException &ex) {
262 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
274 ibrcommon::Queue<dtn::net::BundleTransfer>::Locked l = _sentqueue.exclusive();
278 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) <<
"transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
297 ibrcommon::Queue<dtn::net::BundleTransfer>::Locked l = _sentqueue.exclusive();
301 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) <<
"transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
325 _callback.addTrafficIn(amount);
330 _callback.addTrafficOut(amount);
338 }
catch (
const ibrcommon::ThreadException &ex) {
339 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
348 }
catch (
const ibrcommon::Exception&) {};
352 ibrcommon::DetachedThread::stop();
353 }
catch (
const ibrcommon::ThreadException &ex) {
354 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) <<
"shutdown failed (" << ex.what() <<
")" << IBRCOMMON_LOGGER_ENDL;
364 if (_socket_stream != NULL) _socket_stream->close();
369 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 60) <<
"TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
373 _keepalive_sender.stop();
377 }
catch (
const std::exception&) { };
380 if (_socket_stream != NULL) _socket_stream->close();
383 _callback.connectionDown(
this);
384 }
catch (
const ibrcommon::MutexException&) { };
401 void TCPConnection::__setup_socket(ibrcommon::clientsocket *sock,
bool server)
405 sock->set(ibrcommon::clientsocket::NO_DELAY,
true);
408 _socket_stream =
new ibrcommon::socketstream(sock);
412 timerclear(&timeout);
413 timeout.tv_sec = _timeout;
414 _socket_stream->setTimeout(timeout);
418 _sec_stream =
new ibrcommon::TLSStream(_socket_stream);
419 if (server)
dynamic_cast<ibrcommon::TLSStream&
>(*_sec_stream).setServer(
true);
420 else dynamic_cast<ibrcommon::TLSStream&
>(*_sec_stream).setServer(
false);
426 ibrcommon::RWLock l(_protocol_stream_mutex);
427 if (_protocol_stream != NULL)
delete _protocol_stream;
429 _protocol_stream->exceptions(std::ios::badbit | std::ios::eofbit);
435 if (_socket != NULL)
return;
438 if (_socket_stream != NULL)
return;
441 std::string address =
"0.0.0.0";
442 unsigned int port = 0;
448 for (std::list<dtn::core::Node::URI>::const_iterator iter = uri_list.begin(); iter != uri_list.end(); ++iter)
451 if (_aborted)
throw ibrcommon::socket_exception(
"connection has been aborted");
456 uri.
decode(address, port);
459 ibrcommon::vaddress addr(address, port);
461 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 15) <<
"Initiate TCP connection to " << address <<
":" << port << IBRCOMMON_LOGGER_ENDL;
466 tv.tv_sec = _timeout;
469 ibrcommon::tcpsocket *client =
new ibrcommon::tcpsocket(addr, &tv);
476 __setup_socket(client,
false);
484 }
catch (
const ibrcommon::socket_exception&) {
487 }
catch (
const ibrcommon::socket_exception&) { };
491 throw ibrcommon::socket_exception(
"no address available to connect");
493 }
catch (
const ibrcommon::socket_exception&) {
495 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) <<
"connection to " << _node.
toString() <<
" failed" << IBRCOMMON_LOGGER_ENDL;
498 }
catch (
const ibrcommon::Exception&) {};
500 }
catch (
const bad_cast&) { };
506 if (_socket == NULL) {
511 __setup_socket(_socket,
true);
514 TCPConnection::safe_streamconnection sc = getProtocolStream();
515 std::iostream &stream = (*sc);
524 _keepalive_sender.start();
541 if (!stream.good())
throw ibrcommon::IOException(
"stream went bad");
547 deserializer >> bundle;
550 if ( ( bundle.destination ==
EID() ) || ( bundle.source ==
EID() ) )
561 case BundleFilter::ACCEPT:
566 case BundleFilter::REJECT:
570 case BundleFilter::DROP:
580 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 2) <<
"bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
587 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 2) <<
"invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
592 }
catch (
const ibrcommon::ThreadException &ex) {
593 IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) <<
"failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
596 }
catch (
const ibrcommon::Exception&) {};
597 }
catch (
const std::exception &ex) {
598 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) <<
"run(): std::exception (" << ex.what() <<
")" << IBRCOMMON_LOGGER_ENDL;
601 }
catch (
const ibrcommon::Exception&) {};
605 TCPConnection::safe_streamconnection TCPConnection::getProtocolStream() throw (ibrcommon::Exception)
607 return safe_streamconnection(_protocol_stream, _protocol_stream_mutex);
610 TCPConnection::KeepaliveSender::KeepaliveSender(
TCPConnection &connection,
size_t &keepalive_timeout)
611 : _connection(connection), _keepalive_timeout(keepalive_timeout)
616 TCPConnection::KeepaliveSender::~KeepaliveSender()
620 void TCPConnection::KeepaliveSender::run() throw ()
623 ibrcommon::MutexLock l(_wait);
627 _wait.wait(_keepalive_timeout);
628 }
catch (
const ibrcommon::Conditional::ConditionalAbortException &ex) {
629 if (ex.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT)
632 _connection.keepalive();
640 }
catch (
const std::exception&) { };
643 void TCPConnection::KeepaliveSender::__cancellation() throw ()
645 ibrcommon::MutexLock l(_wait);
650 : _connection(connection)
654 TCPConnection::Sender::~Sender()
658 void TCPConnection::Sender::__cancellation() throw ()
661 ibrcommon::Queue<dtn::net::BundleTransfer>::abort();
664 void TCPConnection::Sender::run() throw ()
669 TCPConnection::safe_streamconnection sc = _connection.getProtocolStream();
670 std::iostream &stream = (*sc);
674 context.
setPeer(_connection._peer._localeid);
675 context.
setProtocol(_connection._callback.getDiscoveryProtocol());
680 while (stream.good())
696 case BundleFilter::ACCEPT:
698 case BundleFilter::REJECT:
699 case BundleFilter::DROP:
713 _connection._resume_offset = 0;
717 _connection._sentqueue.push(transfer);
721 if (!stream.good())
throw ibrcommon::IOException(
"stream went bad");
723 if (_connection._resume_offset > 0)
725 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 4) <<
"Resume transfer of bundle " << bundle.
toString() <<
" to " << _connection.getNode().getEID().getString() <<
", offset: " << _connection._resume_offset << IBRCOMMON_LOGGER_ENDL;
733 serializer << bundle;
737 stream << std::flush;
738 }
catch (
const ibrcommon::Exception &ex) {
740 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) <<
"connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
753 }
catch (
const ibrcommon::QueueUnblockedException &ex) {
754 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 50) <<
"Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
756 }
catch (
const std::exception &ex) {
757 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) <<
"Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
766 ibrcommon::Queue<dtn::net::BundleTransfer>::Locked l = _sentqueue.exclusive();
789 void dtn::net::TCPConnection::enableTLS()
802 return _socket_stream->good();
805 void TCPConnection::Sender::finally() throw ()
static Configuration & getInstance(bool reset=false)
std::string toString() const
void decode(std::string &address, unsigned int &port) const
void initiateExtendedHandshake()
static void raise(State, const dtn::core::Node &)
static dtn::data::EID local
const Configuration::Security & getSecurity() const
bool sameHost(const std::string &other) const
void setBundle(const dtn::data::Bundle &data)
bool getBit(E flag) const
dtn::core::Node::Protocol getDiscoveryProtocol() const
const dtn::core::Node & getNode() const
bool match(const dtn::core::Node &n) const
virtual void eventTimeout()
std::string getHost() const
virtual void addTrafficIn(size_t)
bool TLSRequired() const
Checks if TLS is required.
static void setOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id, const dtn::data::Length &abs_offset, const dtn::data::Length &frag_offset)
BundleFilter::ACTION filter(BundleFilter::TABLE table, const FilterContext &context, dtn::data::Bundle &bundle) const
virtual void eventBundleForwarded()
void setProtocol(const dtn::core::Node::Protocol &protocol)
void queue(const dtn::net::BundleTransfer &job)
const Configuration::Network & getNetwork() const
bool get(FLAGS flag) const
void setPeer(const dtn::data::EID &endpoint)
const dtn::data::EID & getNeighbor() const
virtual void eventBundleRefused()
virtual void addTrafficOut(size_t)
void setFragmentationSupport(bool val)
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
std::list< URI > get(Node::Protocol proto) const
static std::string toString(const Node::Type type)
void abort(const TransferAbortedEvent::AbortReason reason)
virtual void eventConnectionDown()
const dtn::data::MetaBundle & getBundle() const
std::string getString() const
virtual void initialize()
void rejectTransmission()
const dtn::data::EID & getEID() const
const Node & getNode() const
dtn::data::Length getTCPChunkSize() const
dtn::storage::BundleStorage & getStorage()
virtual void eventShutdown(dtn::streams::StreamConnection::ConnectionShutdownCases csc)
virtual void eventConnectionUp(const dtn::streams::StreamContactHeader &header)
dtn::data::Timeout getTCPIdleTimeout() const
static dtn::data::Length getOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id)
static void validateSubject(X509 *certificate, const std::string &cn)
Validates if the CommonName in the given X509 certificate corresponds to the given EID...
const dtn::streams::StreamContactHeader & getHeader() const
virtual void eventBundleAck(const dtn::data::Length &ack)
virtual void eventError()
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, ibrcommon::clientsocket *sock, const size_t timeout=10)
static BundleCore & getInstance()