IBR-DTN  1.0.0
TCPConnection.cpp
Go to the documentation of this file.
1 /*
2  * TCPConnection.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "config.h"
23 #include "Configuration.h"
24 #include "core/BundleCore.h"
25 #include "core/BundleEvent.h"
26 #include "storage/BundleStorage.h"
27 #include "core/FragmentManager.h"
28 
31 #include "net/ConnectionEvent.h"
33 
34 #include <ibrcommon/net/socket.h>
35 #include <ibrcommon/TimeMeasurement.h>
36 #include <ibrcommon/net/vinterface.h>
37 #include <ibrcommon/thread/Conditional.h>
38 #include <ibrcommon/thread/RWLock.h>
39 #include <ibrcommon/Logger.h>
40 
41 #include <iostream>
42 #include <iomanip>
43 #include <memory>
44 
45 #ifdef WITH_TLS
47 #include <openssl/x509.h>
48 #include <ibrcommon/TLSExceptions.h>
49 #include <ibrcommon/ssl/TLSStream.h>
50 #endif
51 
52 namespace dtn
53 {
54  namespace net
55  {
56  const std::string TCPConnection::TAG = "TCPConnection";
57 
58  /*
59  * class TCPConnection
60  */
61  TCPConnection::TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, ibrcommon::clientsocket *sock, const size_t timeout)
62  : _peer(), _node(node), _socket(sock), _socket_stream(NULL), _sec_stream(NULL), _protocol_stream(NULL), _sender(*this),
63  _keepalive_sender(*this, _keepalive_timeout), _timeout(timeout), _lastack(0), _resume_offset(0), _keepalive_timeout(0),
64  _callback(tcpsrv), _flags(0), _aborted(false)
65  {
66  }
67 
69  {
70  // join the keepalive sender thread
71  _keepalive_sender.join();
72 
73  // wait until the sender thread is finished
74  _sender.join();
75 
76  // clean-up
77  {
78  ibrcommon::RWLock l(_protocol_stream_mutex);
79  delete _protocol_stream;
80  _protocol_stream = NULL;
81  }
82 
83  if (_sec_stream != NULL) {
84  delete _sec_stream;
85  }
86 
87  if ((_socket != NULL) && (_socket_stream == NULL)) {
88  delete _socket;
89  } else if (_socket_stream != NULL) {
90  delete _socket_stream;
91  }
92  }
93 
95  {
96  _sender.push(job);
97  }
98 
100  {
101  return _peer;
102  }
103 
105  {
106  return _node;
107  }
108 
110  {
111  (*getProtocolStream()).reject();
112  }
113 
115  {
116  }
117 
119  {
120  // event
122 
123  try {
124  // stop the receiver thread
125  this->stop();
126  } catch (const ibrcommon::ThreadException &ex) {
127  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
128  }
129  }
130 
132  {
133  }
134 
135  void TCPConnection::initiateExtendedHandshake() throw (ibrcommon::Exception)
136  {
137 #ifdef WITH_TLS
138  /* if both nodes support TLS, activate it */
141  {
142  try{
143  ibrcommon::TLSStream &tls = dynamic_cast<ibrcommon::TLSStream&>(*_sec_stream);
144  X509 *peer_cert = tls.activate();
145 
146  // check the full EID first
147  const std::string cn = _peer.getEID().getString();
148 
149  try {
150  try {
152  } catch (const dtn::security::SecurityCertificateException &ex) {
153  // check using the hostname
154  std::string weak_cn = _peer.getEID().getHost();
155 
156  // strip leading "//"
157  if (weak_cn.find_first_of("//") == 0) {
158  weak_cn = weak_cn.substr(2, weak_cn.length() - 2);
159  }
160 
161  try {
163  } catch (const dtn::security::SecurityCertificateException &ex_weak) {
164  throw ex;
165  }
166  }
167  } catch (const dtn::security::SecurityCertificateException &ex) {
168  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
169  throw ibrcommon::TLSCertificateVerificationException(ex.what());
170  }
171  } catch (const std::exception&) {
173  /* close the connection */
174  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 20) << "TLS failed, closing the connection." << IBRCOMMON_LOGGER_ENDL;
175  throw;
176  } else {
177  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 20) << "TLS failed, continuing unauthenticated." << IBRCOMMON_LOGGER_ENDL;
178  }
179  }
180  } else {
181  /* TLS not supported by both Nodes, check if its required */
182  if (dtn::daemon::Configuration::getInstance().getSecurity().TLSRequired()){
183  /* close the connection */
184  throw ibrcommon::TLSException("TLS not supported by peer.");
186  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, notice) << "TLS not supported by peer. Continuing without TLS." << IBRCOMMON_LOGGER_ENDL;
187  }
188  /* else: this node does not support TLS, should have already printed a warning */
189  }
190 #endif
191  }
192 
194  {
195  _peer = header;
196 
197  // copy old attributes and urls to the new node object
198  Node n_old = _node;
199  _node = Node(header._localeid);
200  _node += n_old;
201 
202  // check if the peer has the same EID
203  if (_node.getEID() == dtn::core::BundleCore::getInstance().local)
204  {
205  // abort the connection
206  shutdown();
207 
208  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "connection to local endpoint rejected" << IBRCOMMON_LOGGER_ENDL;
209  return;
210  }
211 
212  _keepalive_timeout = header._keepalive * 1000;
213 
214  try {
215  // initiate extended handshake (e.g. TLS)
216  initiateExtendedHandshake();
217  } catch (const ibrcommon::Exception &ex) {
218  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
219 
220  // abort the connection
221  shutdown();
222  return;
223  }
224 
225  // set the timer
226  timeval timeout;
227  timerclear(&timeout);
228 
229  // set the incoming timer if set (> 0)
230  if (_peer._keepalive > 0)
231  {
232  timeout.tv_sec = header._keepalive * 2;
233  }
234 
235  // change time-out
236  _socket_stream->setTimeout(timeout);
237 
238  try {
239  // enable idle timeout
241  if (_idle_timeout > 0)
242  {
243  (*getProtocolStream()).enableIdleTimeout(_idle_timeout);
244  }
245  } catch (const ibrcommon::Exception&) {};
246 
247  // raise up event
249  }
250 
252  {
253  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 40) << "eventConnectionDown()" << IBRCOMMON_LOGGER_ENDL;
254 
255  try {
256  // shutdown the keepalive sender thread
257  _keepalive_sender.stop();
258 
259  // stop the sender
260  _sender.stop();
261  } catch (const ibrcommon::ThreadException &ex) {
262  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
263  }
264 
265  if (_peer._localeid != dtn::data::EID())
266  {
267  // event
269  }
270  }
271 
273  {
274  ibrcommon::Queue<dtn::net::BundleTransfer>::Locked l = _sentqueue.exclusive();
275 
276  // stop here if the queue is already empty
277  if (l.empty()) {
278  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "transfer refused without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
279  return;
280  }
281 
282  // get the job on top of the sent queue
283  dtn::net::BundleTransfer &job = l.front();
284 
285  // abort the transmission
287 
288  // set ACK to zero
289  _lastack = 0;
290 
291  // release the job
292  l.pop();
293  }
294 
296  {
297  ibrcommon::Queue<dtn::net::BundleTransfer>::Locked l = _sentqueue.exclusive();
298 
299  // stop here if the queue is already empty
300  if (l.empty()) {
301  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "transfer completed without a bundle in queue" << IBRCOMMON_LOGGER_ENDL;
302  return;
303  }
304 
305  // get the job on top of the sent queue
306  dtn::net::BundleTransfer &job = l.front();
307 
308  // mark job as complete
309  job.complete();
310 
311  // set ACK to zero
312  _lastack = 0;
313 
314  // release the job
315  l.pop();
316  }
317 
319  {
320  _lastack = ack;
321  }
322 
323  void TCPConnection::addTrafficIn(size_t amount) throw ()
324  {
325  _callback.addTrafficIn(amount);
326  }
327 
328  void TCPConnection::addTrafficOut(size_t amount) throw ()
329  {
330  _callback.addTrafficOut(amount);
331  }
332 
334  {
335  // start the receiver for incoming bundles + handshake
336  try {
337  start();
338  } catch (const ibrcommon::ThreadException &ex) {
339  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << ex.what() << IBRCOMMON_LOGGER_ENDL;
340  }
341  }
342 
343  void TCPConnection::shutdown() throw ()
344  {
345  try {
346  // shutdown
348  } catch (const ibrcommon::Exception&) {};
349 
350  try {
351  // abort the connection thread
352  ibrcommon::DetachedThread::stop();
353  } catch (const ibrcommon::ThreadException &ex) {
354  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "shutdown failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
355  }
356  }
357 
359  {
360  // mark the connection as aborted
361  _aborted = true;
362 
363  // close the stream
364  if (_socket_stream != NULL) _socket_stream->close();
365  }
366 
367  void TCPConnection::finally() throw ()
368  {
369  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 60) << "TCPConnection down" << IBRCOMMON_LOGGER_ENDL;
370 
371  try {
372  // shutdown the keepalive sender thread
373  _keepalive_sender.stop();
374 
375  // shutdown the sender thread
376  _sender.stop();
377  } catch (const std::exception&) { };
378 
379  // close the tcpstream
380  if (_socket_stream != NULL) _socket_stream->close();
381 
382  try {
383  _callback.connectionDown(this);
384  } catch (const ibrcommon::MutexException&) { };
385 
386  // clear the queue
387  clearQueue();
388  }
389 
390  void TCPConnection::setup() throw ()
391  {
394 
395  if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation())
396  {
398  }
399  }
400 
401  void TCPConnection::__setup_socket(ibrcommon::clientsocket *sock, bool server)
402  {
403  if ( dtn::daemon::Configuration::getInstance().getNetwork().getTCPOptionNoDelay() )
404  {
405  sock->set(ibrcommon::clientsocket::NO_DELAY, true);
406  }
407 
408  _socket_stream = new ibrcommon::socketstream(sock);
409 
410  // set an initial time-out
411  timeval timeout;
412  timerclear(&timeout);
413  timeout.tv_sec = _timeout;
414  _socket_stream->setTimeout(timeout);
415 
416 #ifdef WITH_TLS
417  // initialize security layer if available
418  _sec_stream = new ibrcommon::TLSStream(_socket_stream);
419  if (server) dynamic_cast<ibrcommon::TLSStream&>(*_sec_stream).setServer(true);
420  else dynamic_cast<ibrcommon::TLSStream&>(*_sec_stream).setServer(false);
421 #endif
422 
423  // create a new stream connection
425 
426  ibrcommon::RWLock l(_protocol_stream_mutex);
427  if (_protocol_stream != NULL) delete _protocol_stream;
428  _protocol_stream = new dtn::streams::StreamConnection(*this, (_sec_stream == NULL) ? *_socket_stream : *_sec_stream, chunksize);
429  _protocol_stream->exceptions(std::ios::badbit | std::ios::eofbit);
430  }
431 
433  {
434  // do not connect to other hosts if we are in server
435  if (_socket != NULL) return;
436 
437  // do not connect to anyone if we are already connected
438  if (_socket_stream != NULL) return;
439 
440  // variables for address and port
441  std::string address = "0.0.0.0";
442  unsigned int port = 0;
443 
444  // try to connect to the other side
445  try {
446  const std::list<dtn::core::Node::URI> uri_list = _node.get(dtn::core::Node::CONN_TCPIP);
447 
448  for (std::list<dtn::core::Node::URI>::const_iterator iter = uri_list.begin(); iter != uri_list.end(); ++iter)
449  {
450  // break-out if the connection has been aborted
451  if (_aborted) throw ibrcommon::socket_exception("connection has been aborted");
452 
453  try {
454  // decode address and port
455  const dtn::core::Node::URI &uri = (*iter);
456  uri.decode(address, port);
457 
458  // create a virtual address to connect to
459  ibrcommon::vaddress addr(address, port);
460 
461  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 15) << "Initiate TCP connection to " << address << ":" << port << IBRCOMMON_LOGGER_ENDL;
462 
463  // create a new tcpsocket
464  timeval tv;
465  timerclear(&tv);
466  tv.tv_sec = _timeout;
467 
468  // create a new tcp connection via the tcpsocket object
469  ibrcommon::tcpsocket *client = new ibrcommon::tcpsocket(addr, &tv);
470 
471  try {
472  // connect to the node
473  client->up();
474 
475  // setup a new tcp connection
476  __setup_socket(client, false);
477 
478  // add TCP connection descriptor to the node object
479  _node.clear();
480  _node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, uri.value, 0, 10) );
481 
482  // connection successful
483  return;
484  } catch (const ibrcommon::socket_exception&) {
485  delete client;
486  }
487  } catch (const ibrcommon::socket_exception&) { };
488  }
489 
490  // no connection has been established
491  throw ibrcommon::socket_exception("no address available to connect");
492 
493  } catch (const ibrcommon::socket_exception&) {
494  // error on open, requeue all bundles in the queue
495  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, warning) << "connection to " << _node.toString() << " failed" << IBRCOMMON_LOGGER_ENDL;
496  try {
498  } catch (const ibrcommon::Exception&) {};
499  throw;
500  } catch (const bad_cast&) { };
501  }
502 
503  void TCPConnection::run() throw ()
504  {
505  try {
506  if (_socket == NULL) {
507  // connect to the peer
508  connect();
509  } else {
510  // accept remote connection as server
511  __setup_socket(_socket, true);
512  }
513 
514  TCPConnection::safe_streamconnection sc = getProtocolStream();
515  std::iostream &stream = (*sc);
516 
517  // do the handshake
518  (*sc).handshake(dtn::core::BundleCore::local, _timeout, _flags);
519 
520  // start the sender
521  _sender.start();
522 
523  // start keepalive sender
524  _keepalive_sender.start();
525 
526  // create a filter context
527  dtn::core::FilterContext context;
528  context.setPeer(_peer._localeid);
529  context.setProtocol(_callback.getDiscoveryProtocol());
530 
531  // create a deserializer for next bundle
533 
534  while (!(*sc).eof())
535  {
536  try {
537  // create a new empty bundle
538  dtn::data::Bundle bundle;
539 
540  // check if the stream is still good
541  if (!stream.good()) throw ibrcommon::IOException("stream went bad");
542 
543  // enable/disable fragmentation support according to the contact header.
545 
546  // read the bundle (or the fragment if fragmentation is enabled)
547  deserializer >> bundle;
548 
549  // check the bundle
550  if ( ( bundle.destination == EID() ) || ( bundle.source == EID() ) )
551  {
552  // invalid bundle!
553  throw dtn::data::Validator::RejectedException("destination or source EID is null");
554  }
555 
556  // push bundle through the filter routines
557  context.setBundle(bundle);
559 
560  switch (ret) {
561  case BundleFilter::ACCEPT:
562  // raise default bundle received event
563  dtn::net::BundleReceivedEvent::raise(_peer._localeid, bundle, false);
564  break;
565 
566  case BundleFilter::REJECT:
567  throw dtn::data::Validator::RejectedException("rejected by input filter");
568  break;
569 
570  case BundleFilter::DROP:
571  break;
572  }
573  }
575  {
576  // bundle rejected
578 
579  // display the rejection
580  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 2) << "bundle has been rejected: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
581  }
582  catch (const dtn::InvalidDataException &ex) {
583  // bundle rejected
585 
586  // display the rejection
587  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 2) << "invalid bundle-data received: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
588  }
589 
590  yield();
591  }
592  } catch (const ibrcommon::ThreadException &ex) {
593  IBRCOMMON_LOGGER_TAG(TCPConnection::TAG, error) << "failed to start thread in TCPConnection\n" << ex.what() << IBRCOMMON_LOGGER_ENDL;
594  try {
596  } catch (const ibrcommon::Exception&) {};
597  } catch (const std::exception &ex) {
598  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) << "run(): std::exception (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
599  try {
601  } catch (const ibrcommon::Exception&) {};
602  }
603  }
604 
605  TCPConnection::safe_streamconnection TCPConnection::getProtocolStream() throw (ibrcommon::Exception)
606  {
607  return safe_streamconnection(_protocol_stream, _protocol_stream_mutex);
608  }
609 
610  TCPConnection::KeepaliveSender::KeepaliveSender(TCPConnection &connection, size_t &keepalive_timeout)
611  : _connection(connection), _keepalive_timeout(keepalive_timeout)
612  {
613 
614  }
615 
616  TCPConnection::KeepaliveSender::~KeepaliveSender()
617  {
618  }
619 
620  void TCPConnection::KeepaliveSender::run() throw ()
621  {
622  try {
623  ibrcommon::MutexLock l(_wait);
624  while (true)
625  {
626  try {
627  _wait.wait(_keepalive_timeout);
628  } catch (const ibrcommon::Conditional::ConditionalAbortException &ex) {
629  if (ex.reason == ibrcommon::Conditional::ConditionalAbortException::COND_TIMEOUT)
630  {
631  // send a keepalive
632  _connection.keepalive();
633  }
634  else
635  {
636  throw;
637  }
638  }
639  }
640  } catch (const std::exception&) { };
641  }
642 
643  void TCPConnection::KeepaliveSender::__cancellation() throw ()
644  {
645  ibrcommon::MutexLock l(_wait);
646  _wait.abort();
647  }
648 
649  TCPConnection::Sender::Sender(TCPConnection &connection)
650  : _connection(connection)
651  {
652  }
653 
654  TCPConnection::Sender::~Sender()
655  {
656  }
657 
658  void TCPConnection::Sender::__cancellation() throw ()
659  {
660  // cancel the main thread in here
661  ibrcommon::Queue<dtn::net::BundleTransfer>::abort();
662  }
663 
664  void TCPConnection::Sender::run() throw ()
665  {
666  try {
668 
669  TCPConnection::safe_streamconnection sc = _connection.getProtocolStream();
670  std::iostream &stream = (*sc);
671 
672  // create a filter context
673  dtn::core::FilterContext context;
674  context.setPeer(_connection._peer._localeid);
675  context.setProtocol(_connection._callback.getDiscoveryProtocol());
676 
677  // create a serializer
678  dtn::data::DefaultSerializer serializer(stream);
679 
680  while (stream.good())
681  {
682  dtn::net::BundleTransfer transfer = ibrcommon::Queue<dtn::net::BundleTransfer>::poll();
683 
684  // check if the transfer is directed to the connected neighbor
685  if (transfer.getNeighbor() != _connection.getNode().getEID()) continue;
686 
687  try {
688  // read the bundle out of the storage
689  dtn::data::Bundle bundle = storage.get(transfer.getBundle());
690 
691  // push bundle through the filter routines
692  context.setBundle(bundle);
694 
695  switch (ret) {
696  case BundleFilter::ACCEPT:
697  break;
698  case BundleFilter::REJECT:
699  case BundleFilter::DROP:
701  continue;
702  }
703 
704  // send bundle
705  // get the offset, if this bundle has been reactively fragmented before
706  if (dtn::daemon::Configuration::getInstance().getNetwork().doFragmentation()
708  {
709  _connection._resume_offset = dtn::core::FragmentManager::getOffset(_connection.getNode().getEID(), bundle);
710  }
711  else
712  {
713  _connection._resume_offset = 0;
714  }
715 
716  // put the bundle into the sentqueue
717  _connection._sentqueue.push(transfer);
718 
719  try {
720  // activate exceptions for this method
721  if (!stream.good()) throw ibrcommon::IOException("stream went bad");
722 
723  if (_connection._resume_offset > 0)
724  {
725  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 4) << "Resume transfer of bundle " << bundle.toString() << " to " << _connection.getNode().getEID().getString() << ", offset: " << _connection._resume_offset << IBRCOMMON_LOGGER_ENDL;
726 
727  // transmit the fragment
728  serializer << dtn::data::BundleFragment(bundle, _connection._resume_offset, -1);
729  }
730  else
731  {
732  // transmit the bundle
733  serializer << bundle;
734  }
735 
736  // flush the stream
737  stream << std::flush;
738  } catch (const ibrcommon::Exception &ex) {
739  // the connection not available
740  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) << "connection error: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
741 
742  // forward exception
743  throw;
744  }
745  } catch (const dtn::storage::NoBundleFoundException&) {
746  // send transfer aborted event
748  }
749 
750  // idle a little bit
751  yield();
752  }
753  } catch (const ibrcommon::QueueUnblockedException &ex) {
754  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 50) << "Sender::run(): aborted" << IBRCOMMON_LOGGER_ENDL;
755  return;
756  } catch (const std::exception &ex) {
757  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConnection::TAG, 10) << "Sender terminated by exception: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
758  }
759 
760  _connection.stop();
761  }
762 
764  {
765  // requeue all bundles still in transit
766  ibrcommon::Queue<dtn::net::BundleTransfer>::Locked l = _sentqueue.exclusive();
767 
768  while (!l.empty())
769  {
770  // get the job on top of the sent queue
771  const dtn::net::BundleTransfer &job = l.front();
772 
774  {
775  // some data are already acknowledged
776  // store this information in the fragment manager
777  dtn::core::FragmentManager::setOffset(_peer.getEID(), job.getBundle(), _lastack, _resume_offset);
778  }
779 
780  // set last ack to zero
781  _lastack = 0;
782 
783  // release the job
784  l.pop();
785  }
786  }
787 
788 #ifdef WITH_TLS
789  void dtn::net::TCPConnection::enableTLS()
790  {
792  }
793 #endif
794 
796  {
797  (*getProtocolStream()).keepalive();
798  }
799 
800  bool TCPConnection::good() const
801  {
802  return _socket_stream->good();
803  }
804 
805  void TCPConnection::Sender::finally() throw ()
806  {
807  }
808 
810  {
811  return (_node == n);
812  }
813 
814  bool TCPConnection::match(const dtn::data::EID &destination) const
815  {
816  return _node.getEID().sameHost(destination);
817  }
818 
819  bool TCPConnection::match(const NodeEvent &evt) const
820  {
821  const dtn::core::Node &n = evt.getNode();
822  return match(n);
823  }
824  }
825 }
static Configuration & getInstance(bool reset=false)
std::string toString() const
Definition: BundleID.cpp:190
void decode(std::string &address, unsigned int &port) const
Definition: Node.cpp:47
static void raise(State, const dtn::core::Node &)
static dtn::data::EID local
Definition: BundleCore.h:79
const Configuration::Security & getSecurity() const
dtn::data::Bitset< HEADER_BITS > _flags
bool sameHost(const std::string &other) const
Definition: EID.cpp:322
size_t Length
Definition: Number.h:33
void setBundle(const dtn::data::Bundle &data)
bool getBit(E flag) const
Definition: Number.h:66
dtn::core::Node::Protocol getDiscoveryProtocol() const
const dtn::core::Node & getNode() const
bool match(const dtn::core::Node &n) const
virtual void eventTimeout()
std::string getHost() const
Definition: EID.cpp:475
virtual void addTrafficIn(size_t)
bool TLSRequired() const
Checks if TLS is required.
static void setOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id, const dtn::data::Length &abs_offset, const dtn::data::Length &frag_offset)
BundleFilter::ACTION filter(BundleFilter::TABLE table, const FilterContext &context, dtn::data::Bundle &bundle) const
Definition: BundleCore.cpp:598
virtual void eventBundleForwarded()
void setProtocol(const dtn::core::Node::Protocol &protocol)
void queue(const dtn::net::BundleTransfer &job)
const Configuration::Network & getNetwork() const
bool get(FLAGS flag) const
void setPeer(const dtn::data::EID &endpoint)
const dtn::data::EID & getNeighbor() const
std::string value
Definition: Node.h:89
virtual void eventBundleRefused()
EID getNode() const
Definition: EID.cpp:528
virtual void addTrafficOut(size_t)
void setFragmentationSupport(bool val)
Definition: Serializer.cpp:547
void add(const URI &u)
Definition: Node.cpp:280
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
std::list< URI > get(Node::Protocol proto) const
Definition: Node.cpp:325
static std::string toString(const Node::Type type)
Definition: Node.cpp:152
void clear()
Definition: Node.cpp:302
void abort(const TransferAbortedEvent::AbortReason reason)
virtual void eventConnectionDown()
const dtn::data::MetaBundle & getBundle() const
std::string getString() const
Definition: EID.cpp:374
const dtn::data::EID & getEID() const
Definition: Node.cpp:406
const Node & getNode() const
Definition: NodeEvent.cpp:55
dtn::data::Length getTCPChunkSize() const
dtn::storage::BundleStorage & getStorage()
Definition: BundleCore.cpp:237
virtual void eventShutdown(dtn::streams::StreamConnection::ConnectionShutdownCases csc)
virtual void eventConnectionUp(const dtn::streams::StreamContactHeader &header)
dtn::data::Timeout getTCPIdleTimeout() const
static dtn::data::Length getOffset(const dtn::data::EID &peer, const dtn::data::BundleID &id)
static void validateSubject(X509 *certificate, const std::string &cn)
Validates if the CommonName in the given X509 certificate corresponds to the given EID...
const dtn::streams::StreamContactHeader & getHeader() const
virtual void eventBundleAck(const dtn::data::Length &ack)
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
TCPConnection(TCPConvergenceLayer &tcpsrv, const dtn::core::Node &node, ibrcommon::clientsocket *sock, const size_t timeout=10)
static BundleCore & getInstance()
Definition: BundleCore.cpp:82