IBR-DTN  1.0.0
TCPConvergenceLayer.cpp
Go to the documentation of this file.
1 /*
2  * TCPConvergenceLayer.cpp
3  *
4  * Copyright (C) 2011 IBR, TU Braunschweig
5  *
6  * Written-by: Johannes Morgenroth <morgenroth@ibr.cs.tu-bs.de>
7  *
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  * http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  *
20  */
21 
22 #include "Configuration.h"
23 
25 #include "net/ConnectionEvent.h"
26 #include "net/DiscoveryAgent.h"
27 #include "core/BundleCore.h"
28 #include "core/EventDispatcher.h"
29 
30 #include <ibrcommon/net/vinterface.h>
31 #include <ibrcommon/thread/MutexLock.h>
32 #include <ibrcommon/Logger.h>
33 #include <ibrcommon/net/socket.h>
34 #include <ibrcommon/Logger.h>
35 
36 #include <streambuf>
37 #include <functional>
38 #include <list>
39 #include <algorithm>
40 
41 #ifdef WITH_TLS
42 #include <ibrcommon/ssl/TLSStream.h>
43 #endif
44 
45 namespace dtn
46 {
47  namespace net
48  {
49  /*
50  * class TCPConvergenceLayer
51  */
52  const std::string TCPConvergenceLayer::TAG = "TCPConvergenceLayer";
53 
54  const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
55 
57  : _vsocket_state(false), _any_port(0), _stats_in(0), _stats_out(0),
58  _keepalive_timeout( dtn::daemon::Configuration::getInstance().getNetwork().getKeepaliveInterval() )
59  {
60  }
61 
63  {
64  // unsubscribe to NetLink events
65  ibrcommon::LinkManager::getInstance().removeEventListener(this);
66 
67  // un-register as discovery handler
69 
70  join();
71 
72  // delete all sockets
73  _vsocket.destroy();
74  }
75 
76  void TCPConvergenceLayer::add(const ibrcommon::vinterface &net, int port) throw ()
77  {
78  // do not allow any futher binding if we already bound to any interface
79  if (_any_port > 0) return;
80 
81  if (net.isAny()) {
82  // bind to any interface
83  _vsocket.add(new ibrcommon::tcpserversocket(port));
84  _any_port = port;
85  } else if (net.isLoopback()) {
86  // bind to v6 loopback address if supported
87  if (ibrcommon::basesocket::hasSupport(AF_INET6)) {
88  ibrcommon::vaddress addr6(ibrcommon::vaddress::VADDR_LOCALHOST, port, AF_INET6);
89  _vsocket.add(new ibrcommon::tcpserversocket(addr6));
90  }
91 
92  // bind to v4 loopback address
93  ibrcommon::vaddress addr4(ibrcommon::vaddress::VADDR_LOCALHOST, port, AF_INET);
94  _vsocket.add(new ibrcommon::tcpserversocket(addr4));
95  } else {
96  listen(net, port);
97  }
98  }
99 
100  void TCPConvergenceLayer::listen(const ibrcommon::vinterface &net, int port) throw ()
101  {
102  try {
103  // add the new interface to internal data-structures
104  {
105  ibrcommon::MutexLock l(_interface_lock);
106 
107  // only add the interface once
108  if (_interfaces.find(net) != _interfaces.end()) return;
109 
110  // store the new interface in the list of interfaces
111  _interfaces.insert(net);
112  }
113 
114  // subscribe to NetLink events on our interfaces
115  ibrcommon::LinkManager::getInstance().addEventListener(net, this);
116 
117  // register as discovery handler for this interface
119 
120  // store port of the interface
121  {
122  ibrcommon::MutexLock l(_portmap_lock);
123  _portmap[net] = port;
124  }
125 
126  // create sockets for all addresses on the interface
127  // may throw "interface_not_set"
128  std::list<ibrcommon::vaddress> addrs = net.getAddresses();
129 
130  // convert the port into a string
131  std::stringstream ss; ss << port;
132 
133  for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
134  ibrcommon::vaddress &addr = (*iter);
135 
136  // handle the addresses according to their family
137  // may throw "address_exception"
138  try {
139  switch (addr.family()) {
140  case AF_INET:
141  case AF_INET6:
142  {
143  addr.setService(ss.str());
144  ibrcommon::tcpserversocket *sock = new ibrcommon::tcpserversocket(addr);
145  if (_vsocket_state) sock->up();
146  _vsocket.add(sock, net);
147 
148  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) << "bound to " << net.toString() << " (" << addr.toString() << ", family: " << addr.family() << ")" << IBRCOMMON_LOGGER_ENDL;
149  break;
150  }
151  default:
152  break;
153  }
154  } catch (const ibrcommon::vaddress::address_exception &ex) {
155  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
156  } catch (const ibrcommon::socket_exception &ex) {
157  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
158  }
159  }
160  } catch (const ibrcommon::vinterface::interface_not_set &ex) {
161  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
162  }
163  }
164 
165  void TCPConvergenceLayer::unlisten(const ibrcommon::vinterface &iface) throw ()
166  {
167  ibrcommon::socketset socks = _vsocket.get(iface);
168  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
169  ibrcommon::tcpserversocket *sock = dynamic_cast<ibrcommon::tcpserversocket*>(*iter);
170  _vsocket.remove(sock);
171  try {
172  sock->down();
173  } catch (const ibrcommon::socket_exception &ex) {
174  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
175  }
176  delete sock;
177  }
178 
179  {
180  ibrcommon::MutexLock l(_portmap_lock);
181  _portmap.erase(iface);
182  }
183 
184  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) << "unbound from " << iface.toString() << IBRCOMMON_LOGGER_ENDL;
185  }
186 
188  {
190  }
191 
193  {
194  ibrcommon::MutexLock l(_interface_lock);
195 
196  // announce port only if we are bound to any interface
197  if (_interfaces.empty() && (_any_port > 0)) {
198  std::stringstream service;
199  // ... set the port only
200  ibrcommon::MutexLock l(_portmap_lock);
201  service << "port=" << _portmap[iface] << ";";
202  beacon.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
203  return;
204  }
205 
206  // determine if we should enable crosslayer discovery by sending out our own address
208 
209  // this marker should set to true if we added an service description
210  bool announced = false;
211 
212  // search for the matching interface
213  for (std::set<ibrcommon::vinterface>::const_iterator it = _interfaces.begin(); it != _interfaces.end(); ++it)
214  {
215  const ibrcommon::vinterface &it_iface = *it;
216  if (it_iface == iface)
217  {
218  try {
219  // check if cross layer discovery is disabled
220  if (!crosslayer) throw ibrcommon::Exception("crosslayer discovery disabled!");
221 
222  // get all addresses of this interface
223  std::list<ibrcommon::vaddress> list = it_iface.getAddresses();
224 
225  // if no address is returned... (goto catch block)
226  if (list.empty()) throw ibrcommon::Exception("no address found");
227 
228  for (std::list<ibrcommon::vaddress>::const_iterator addr_it = list.begin(); addr_it != list.end(); ++addr_it)
229  {
230  const ibrcommon::vaddress &addr = (*addr_it);
231 
232  if (addr.scope() != ibrcommon::vaddress::SCOPE_GLOBAL) continue;
233 
234  try {
235  // do not announce non-IP addresses
236  sa_family_t f = addr.family();
237  if ((f != AF_INET) && (f != AF_INET6)) continue;
238 
239  std::stringstream service;
240  // fill in the ip address
241  ibrcommon::MutexLock l(_portmap_lock);
242  service << "ip=" << addr.address() << ";port=" << _portmap[iface] << ";";
243  beacon.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
244 
245  // set the announce mark
246  announced = true;
247  } catch (const ibrcommon::vaddress::address_exception &ex) {
248  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) << ex.what() << IBRCOMMON_LOGGER_ENDL;
249  }
250  }
251  } catch (const ibrcommon::Exception &ex) {
252  // address collection process aborted
253  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 65) << "Address collection aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
254  };
255 
256  // if we still not announced anything...
257  if (!announced) {
258  std::stringstream service;
259  // ... set the port only
260  ibrcommon::MutexLock l(_portmap_lock);
261  service << "port=" << _portmap[iface] << ";";
262  beacon.addService( DiscoveryService(getDiscoveryProtocol(), service.str()));
263  }
264  return;
265  }
266  }
267 
269  }
270 
271  const std::string TCPConvergenceLayer::getName() const
272  {
273  return TCPConvergenceLayer::TAG;
274  }
275 
277  {
278  switch (dialup.type)
279  {
281  {
282  // listen to the new interface
283  listen(dialup.iface, 4556);
284  break;
285  }
286 
288  {
289  // check if the interface is bound by us
290  {
291  ibrcommon::MutexLock l(_interface_lock);
292 
293  // only remove the interface if it exists
294  if (_interfaces.find(dialup.iface) == _interfaces.end()) return;
295 
296  // remove the interface from the stored set
297  _interfaces.erase(dialup.iface);
298  }
299 
300  // un-subscribe to NetLink events on our interfaces
301  ibrcommon::LinkManager::getInstance().removeEventListener(dialup.iface, this);
302 
303  // un-register as discovery handler for this interface
305 
306  // remove all sockets on this interface
307  unlisten(dialup.iface);
308  break;
309  }
310  }
311  }
312 
313  void TCPConvergenceLayer::eventNotify(const ibrcommon::LinkEvent &evt)
314  {
315  // do not do anything if we are bound on all interfaces
316  if (_any_port > 0) return;
317 
318  {
319  ibrcommon::MutexLock l(_interface_lock);
320  if (_interfaces.find(evt.getInterface()) == _interfaces.end()) return;
321  }
322 
323  switch (evt.getAction())
324  {
325  case ibrcommon::LinkEvent::ACTION_ADDRESS_ADDED:
326  {
327  ibrcommon::vaddress bindaddr = evt.getAddress();
328  // convert the port into a string
329  ibrcommon::MutexLock l(_portmap_lock);
330  std::stringstream ss; ss << _portmap[evt.getInterface()];
331  bindaddr.setService(ss.str());
332  ibrcommon::tcpserversocket *sock = new ibrcommon::tcpserversocket(bindaddr);
333  try {
334  sock->up();
335  _vsocket.add(sock, evt.getInterface());
336  } catch (const ibrcommon::socket_exception&) {
337  delete sock;
338  }
339  break;
340  }
341 
342  case ibrcommon::LinkEvent::ACTION_ADDRESS_REMOVED:
343  {
344  ibrcommon::socketset socks = _vsocket.getAll();
345  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
346  ibrcommon::tcpserversocket *sock = dynamic_cast<ibrcommon::tcpserversocket*>(*iter);
347  if (sock->get_address().address() == evt.getAddress().address()) {
348  _vsocket.remove(sock);
349  sock->down();
350  delete sock;
351  break;
352  }
353  }
354  break;
355  }
356 
357  case ibrcommon::LinkEvent::ACTION_LINK_DOWN:
358  {
359  // remove all sockets on this interface
360  const ibrcommon::vinterface &iface = evt.getInterface();
361 
362  ibrcommon::socketset socks = _vsocket.get(iface);
363  for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
364  ibrcommon::tcpserversocket *sock = dynamic_cast<ibrcommon::tcpserversocket*>(*iter);
365  _vsocket.remove(sock);
366  try {
367  sock->down();
368  } catch (const ibrcommon::socket_exception &ex) {
369  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
370  }
371  delete sock;
372  }
373  break;
374  }
375 
376  default:
377  break;
378  }
379  }
380 
382  {
383  // search for an existing connection
384  ibrcommon::MutexLock l(_connections_cond);
385 
386  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
387  {
388  TCPConnection &conn = *(*iter);
389 
390  if (conn.match(n))
391  {
392  return;
393  }
394  }
395 
396  try {
397  // create a connection
398  TCPConnection *conn = new TCPConnection(*this, n, NULL, _keepalive_timeout);
399 
400 #ifdef WITH_TLS
401  // enable TLS Support
402  if ( ibrcommon::TLSStream::isInitialized() )
403  {
404  conn->enableTLS();
405  }
406 #endif
407 
408  // raise setup event
410 
411  // add connection as pending
412  _connections.push_back( conn );
413 
414  // start the ClientHandler (service)
415  conn->initialize();
416 
417  // signal that there is a new connection
418  _connections_cond.signal(true);
419  } catch (const ibrcommon::Exception&) { };
420 
421  return;
422  }
423 
425  {
426  // search for an existing connection
427  ibrcommon::MutexLock l(_connections_cond);
428 
429  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
430  {
431  TCPConnection &conn = *(*iter);
432 
433  if (conn.match(n))
434  {
435  conn.queue(job);
436  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "queued bundle to an existing tcp connection (" << conn.getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
437 
438  return;
439  }
440  }
441 
442  try {
443  // create a connection
444  TCPConnection *conn = new TCPConnection(*this, n, NULL, _keepalive_timeout);
445 
446 #ifdef WITH_TLS
447  // enable TLS Support
448  if ( ibrcommon::TLSStream::isInitialized() )
449  {
450  conn->enableTLS();
451  }
452 #endif
453 
454  // raise setup event
456 
457  // add connection as pending
458  _connections.push_back( conn );
459 
460  // start the ClientHandler (service)
461  conn->initialize();
462 
463  // queue the bundle
464  conn->queue(job);
465 
466  // signal that there is a new connection
467  _connections_cond.signal(true);
468 
469  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "queued bundle to an new tcp connection (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
470  } catch (const ibrcommon::Exception&) {
471 
472  }
473  }
474 
475  void TCPConvergenceLayer::connectionUp(TCPConnection *conn)
476  {
477  ibrcommon::MutexLock l(_connections_cond);
478  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
479  {
480  if (conn == (*iter))
481  {
482  // put pending connection to the active connections
483  return;
484  }
485  }
486 
487  _connections.push_back( conn );
488 
489  // signal that there is a new connection
490  _connections_cond.signal(true);
491 
492  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "tcp connection added (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
493  }
494 
495  void TCPConvergenceLayer::connectionDown(TCPConnection *conn)
496  {
497  ibrcommon::MutexLock l(_connections_cond);
498  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
499  {
500  if (conn == (*iter))
501  {
502  _connections.erase(iter);
503  IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) << "tcp connection removed (" << conn->getNode().toString() << ")" << IBRCOMMON_LOGGER_ENDL;
504 
505  // signal that there is a connection less
506  _connections_cond.signal(true);
507  return;
508  }
509  }
510  }
511 
512  void TCPConvergenceLayer::addTrafficIn(size_t amount) throw ()
513  {
514  ibrcommon::MutexLock l(_stats_lock);
515  _stats_in += amount;
516  }
517 
518  void TCPConvergenceLayer::addTrafficOut(size_t amount) throw ()
519  {
520  ibrcommon::MutexLock l(_stats_lock);
521  _stats_out += amount;
522  }
523 
525  {
526  try {
527  while (true)
528  {
529  ibrcommon::socketset readfds;
530 
531  // wait for incoming connections
532  _vsocket.select(&readfds, NULL, NULL, NULL);
533 
534  for (ibrcommon::socketset::iterator iter = readfds.begin(); iter != readfds.end(); ++iter) {
535  try {
536  // assume that all sockets are serversockets
537  ibrcommon::serversocket &sock = dynamic_cast<ibrcommon::serversocket&>(**iter);
538 
539  // wait for incoming connections
540  ibrcommon::vaddress peeraddr;
541  ibrcommon::clientsocket *client = sock.accept(peeraddr);
542 
543  // create a EID based on the peer address
544  dtn::data::EID source("tcp://" + peeraddr.address() + ":" + peeraddr.service());
545 
546  // create a new node object
547  dtn::core::Node node(source);
548 
549  // add TCP connection
550  const std::string uri = "ip=" + peeraddr.address() + ";port=" + peeraddr.service() + ";";
551  node.add( dtn::core::Node::URI(Node::NODE_CONNECTED, Node::CONN_TCPIP, uri, 0, 10) );
552 
553  // create a new TCPConnection and return the pointer
554  TCPConnection *obj = new TCPConnection(*this, node, client, _keepalive_timeout);
555 
556 #ifdef WITH_TLS
557  // enable TLS Support
558  if ( ibrcommon::TLSStream::isInitialized() )
559  {
560  obj->enableTLS();
561  }
562 #endif
563 
564  // add the connection to the connection list
565  connectionUp(obj);
566 
567  // initialize the object
568  obj->initialize();
569  } catch (const std::bad_cast&) {
570 
571  }
572  }
573 
574  // breakpoint
575  ibrcommon::Thread::yield();
576  }
577  } catch (const std::exception&) {
578  // ignore all errors
579  return;
580  }
581  }
582 
584  {
585  _vsocket.down();
586  }
587 
588  void TCPConvergenceLayer::closeAll()
589  {
590  // search for an existing connection
591  ibrcommon::MutexLock l(_connections_cond);
592  for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
593  {
594  TCPConnection &conn = *(*iter);
595 
596  // close the connection immediately
597  conn.shutdown();
598  }
599  }
600 
602  {
603  // listen on P2P dial-up events
605 
606  // routine checked for throw() on 15.02.2013
607  try {
608  // listen on the socket
609  _vsocket.up();
610  _vsocket_state = true;
611  } catch (const ibrcommon::socket_exception &ex) {
612  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, error) << "bind failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
613  }
614  }
615 
617  {
618  // un-listen on P2P dial-up events
620 
621  // routine checked for throw() on 15.02.2013
622  try {
623  // shutdown all sockets
624  _vsocket.down();
625  _vsocket_state = false;
626  } catch (const ibrcommon::socket_exception &ex) {
627  IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, error) << "shutdown failed (" << ex.what() << ")" << IBRCOMMON_LOGGER_ENDL;
628  }
629 
630  // close all active connections
631  closeAll();
632 
633  // wait until all tcp connections are down
634  {
635  ibrcommon::MutexLock l(_connections_cond);
636  while (_connections.size() > 0) _connections_cond.wait();
637  }
638  }
639  }
640 
641  void TCPConvergenceLayer::getStats(ConvergenceLayer::stats_data &data) const
642  {
643  std::stringstream ss_format;
644 
645  static const std::string IN_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|in";
646  static const std::string OUT_TAG = dtn::core::Node::toString(getDiscoveryProtocol()) + "|out";
647 
648  ss_format << _stats_in;
649  data[IN_TAG] = ss_format.str();
650  ss_format.str("");
651 
652  ss_format << _stats_out;
653  data[OUT_TAG] = ss_format.str();
654  }
655 
656  void TCPConvergenceLayer::resetStats()
657  {
658  _stats_in = 0;
659  _stats_out = 0;
660  }
661 }
static Configuration & getInstance(bool reset=false)
static void raise(State, const dtn::core::Node &)
const Configuration::Discovery & getDiscovery() const
static void add(EventReceiver< E > *receiver)
dtn::core::Node::Protocol getDiscoveryProtocol() const
const dtn::core::Node & getNode() const
void open(const dtn::core::Node &n)
bool match(const dtn::core::Node &n) const
static void remove(const EventReceiver< E > *receiver)
dtn::net::DiscoveryAgent & getDiscoveryAgent()
Definition: BundleCore.cpp:265
void unregisterService(const ibrcommon::vinterface &iface, const dtn::net::DiscoveryBeaconHandler *handler)
void queue(const dtn::net::BundleTransfer &job)
void onUpdateBeacon(const ibrcommon::vinterface &iface, DiscoveryBeacon &beacon)
void add(const ibrcommon::vinterface &net, int port)
std::map< string, string > stats_data
void queue(const dtn::core::Node &n, const dtn::net::BundleTransfer &job)
static std::string toString(const Node::Type type)
Definition: Node.cpp:152
std::string toString() const
Definition: Node.cpp:510
void raiseEvent(const dtn::net::P2PDialupEvent &evt)
void registerService(const ibrcommon::vinterface &iface, dtn::net::DiscoveryBeaconHandler *handler)
void eventNotify(const ibrcommon::LinkEvent &evt)
static BundleCore & getInstance()
Definition: BundleCore.cpp:82
virtual const std::string getName() const