30 #include <ibrcommon/net/vinterface.h>
31 #include <ibrcommon/thread/MutexLock.h>
32 #include <ibrcommon/Logger.h>
33 #include <ibrcommon/net/socket.h>
34 #include <ibrcommon/Logger.h>
42 #include <ibrcommon/ssl/TLSStream.h>
52 const std::string TCPConvergenceLayer::TAG =
"TCPConvergenceLayer";
54 const int TCPConvergenceLayer::DEFAULT_PORT = 4556;
57 : _vsocket_state(false), _any_port(0), _stats_in(0), _stats_out(0),
58 _keepalive_timeout( dtn::daemon::Configuration::getInstance().getNetwork().getKeepaliveInterval() )
65 ibrcommon::LinkManager::getInstance().removeEventListener(
this);
79 if (_any_port > 0)
return;
83 _vsocket.add(
new ibrcommon::tcpserversocket(port));
85 }
else if (net.isLoopback()) {
87 if (ibrcommon::basesocket::hasSupport(AF_INET6)) {
88 ibrcommon::vaddress addr6(ibrcommon::vaddress::VADDR_LOCALHOST, port, AF_INET6);
89 _vsocket.add(
new ibrcommon::tcpserversocket(addr6));
93 ibrcommon::vaddress addr4(ibrcommon::vaddress::VADDR_LOCALHOST, port, AF_INET);
94 _vsocket.add(
new ibrcommon::tcpserversocket(addr4));
100 void TCPConvergenceLayer::listen(
const ibrcommon::vinterface &net,
int port)
throw ()
105 ibrcommon::MutexLock l(_interface_lock);
108 if (_interfaces.find(net) != _interfaces.end())
return;
111 _interfaces.insert(net);
115 ibrcommon::LinkManager::getInstance().addEventListener(net,
this);
122 ibrcommon::MutexLock l(_portmap_lock);
123 _portmap[net] = port;
128 std::list<ibrcommon::vaddress> addrs = net.getAddresses();
131 std::stringstream ss; ss << port;
133 for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
134 ibrcommon::vaddress &addr = (*iter);
139 switch (addr.family()) {
143 addr.setService(ss.str());
144 ibrcommon::tcpserversocket *sock =
new ibrcommon::tcpserversocket(addr);
145 if (_vsocket_state) sock->up();
146 _vsocket.add(sock, net);
148 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) <<
"bound to " << net.toString() <<
" (" << addr.toString() <<
", family: " << addr.family() <<
")" << IBRCOMMON_LOGGER_ENDL;
154 }
catch (
const ibrcommon::vaddress::address_exception &ex) {
155 IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
156 }
catch (
const ibrcommon::socket_exception &ex) {
157 IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
160 }
catch (
const ibrcommon::vinterface::interface_not_set &ex) {
161 IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
165 void TCPConvergenceLayer::unlisten(
const ibrcommon::vinterface &iface)
throw ()
167 ibrcommon::socketset socks = _vsocket.get(iface);
168 for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
169 ibrcommon::tcpserversocket *sock =
dynamic_cast<ibrcommon::tcpserversocket*
>(*iter);
170 _vsocket.remove(sock);
173 }
catch (
const ibrcommon::socket_exception &ex) {
174 IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
180 ibrcommon::MutexLock l(_portmap_lock);
181 _portmap.erase(iface);
184 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) <<
"unbound from " << iface.toString() << IBRCOMMON_LOGGER_ENDL;
194 ibrcommon::MutexLock l(_interface_lock);
197 if (_interfaces.empty() && (_any_port > 0)) {
198 std::stringstream service;
200 ibrcommon::MutexLock l(_portmap_lock);
201 service <<
"port=" << _portmap[iface] <<
";";
202 beacon.addService(
DiscoveryService(getDiscoveryProtocol(), service.str()));
210 bool announced =
false;
213 for (std::set<ibrcommon::vinterface>::const_iterator it = _interfaces.begin(); it != _interfaces.end(); ++it)
215 const ibrcommon::vinterface &it_iface = *it;
216 if (it_iface == iface)
220 if (!crosslayer)
throw ibrcommon::Exception(
"crosslayer discovery disabled!");
223 std::list<ibrcommon::vaddress> list = it_iface.getAddresses();
226 if (list.empty())
throw ibrcommon::Exception(
"no address found");
228 for (std::list<ibrcommon::vaddress>::const_iterator addr_it = list.begin(); addr_it != list.end(); ++addr_it)
230 const ibrcommon::vaddress &addr = (*addr_it);
232 if (addr.scope() != ibrcommon::vaddress::SCOPE_GLOBAL)
continue;
236 sa_family_t f = addr.family();
237 if ((f != AF_INET) && (f != AF_INET6))
continue;
239 std::stringstream service;
241 ibrcommon::MutexLock l(_portmap_lock);
242 service <<
"ip=" << addr.address() <<
";port=" << _portmap[iface] <<
";";
243 beacon.addService(
DiscoveryService(getDiscoveryProtocol(), service.str()));
247 }
catch (
const ibrcommon::vaddress::address_exception &ex) {
248 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 25) << ex.what() << IBRCOMMON_LOGGER_ENDL;
251 }
catch (
const ibrcommon::Exception &ex) {
253 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 65) <<
"Address collection aborted: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
258 std::stringstream service;
260 ibrcommon::MutexLock l(_portmap_lock);
261 service <<
"port=" << _portmap[iface] <<
";";
262 beacon.addService(
DiscoveryService(getDiscoveryProtocol(), service.str()));
273 return TCPConvergenceLayer::TAG;
283 listen(dialup.iface, 4556);
291 ibrcommon::MutexLock l(_interface_lock);
294 if (_interfaces.find(dialup.iface) == _interfaces.end())
return;
297 _interfaces.erase(dialup.iface);
301 ibrcommon::LinkManager::getInstance().removeEventListener(dialup.iface,
this);
307 unlisten(dialup.iface);
316 if (_any_port > 0)
return;
319 ibrcommon::MutexLock l(_interface_lock);
320 if (_interfaces.find(evt.getInterface()) == _interfaces.end())
return;
323 switch (evt.getAction())
325 case ibrcommon::LinkEvent::ACTION_ADDRESS_ADDED:
327 ibrcommon::vaddress bindaddr = evt.getAddress();
329 ibrcommon::MutexLock l(_portmap_lock);
330 std::stringstream ss; ss << _portmap[evt.getInterface()];
331 bindaddr.setService(ss.str());
332 ibrcommon::tcpserversocket *sock =
new ibrcommon::tcpserversocket(bindaddr);
335 _vsocket.add(sock, evt.getInterface());
336 }
catch (
const ibrcommon::socket_exception&) {
342 case ibrcommon::LinkEvent::ACTION_ADDRESS_REMOVED:
344 ibrcommon::socketset socks = _vsocket.getAll();
345 for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
346 ibrcommon::tcpserversocket *sock =
dynamic_cast<ibrcommon::tcpserversocket*
>(*iter);
347 if (sock->get_address().address() == evt.getAddress().address()) {
348 _vsocket.remove(sock);
357 case ibrcommon::LinkEvent::ACTION_LINK_DOWN:
360 const ibrcommon::vinterface &iface = evt.getInterface();
362 ibrcommon::socketset socks = _vsocket.get(iface);
363 for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
364 ibrcommon::tcpserversocket *sock =
dynamic_cast<ibrcommon::tcpserversocket*
>(*iter);
365 _vsocket.remove(sock);
368 }
catch (
const ibrcommon::socket_exception &ex) {
369 IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
384 ibrcommon::MutexLock l(_connections_cond);
386 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
402 if ( ibrcommon::TLSStream::isInitialized() )
412 _connections.push_back( conn );
418 _connections_cond.signal(
true);
419 }
catch (
const ibrcommon::Exception&) { };
427 ibrcommon::MutexLock l(_connections_cond);
429 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
436 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) <<
"queued bundle to an existing tcp connection (" << conn.
getNode().
toString() <<
")" << IBRCOMMON_LOGGER_ENDL;
448 if ( ibrcommon::TLSStream::isInitialized() )
458 _connections.push_back( conn );
467 _connections_cond.signal(
true);
469 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) <<
"queued bundle to an new tcp connection (" << conn->
getNode().
toString() <<
")" << IBRCOMMON_LOGGER_ENDL;
470 }
catch (
const ibrcommon::Exception&) {
477 ibrcommon::MutexLock l(_connections_cond);
478 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
487 _connections.push_back( conn );
490 _connections_cond.signal(
true);
492 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) <<
"tcp connection added (" << conn->
getNode().
toString() <<
")" << IBRCOMMON_LOGGER_ENDL;
495 void TCPConvergenceLayer::connectionDown(
TCPConnection *conn)
497 ibrcommon::MutexLock l(_connections_cond);
498 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
502 _connections.erase(iter);
503 IBRCOMMON_LOGGER_DEBUG_TAG(TCPConvergenceLayer::TAG, 15) <<
"tcp connection removed (" << conn->
getNode().
toString() <<
")" << IBRCOMMON_LOGGER_ENDL;
506 _connections_cond.signal(
true);
512 void TCPConvergenceLayer::addTrafficIn(
size_t amount)
throw ()
514 ibrcommon::MutexLock l(_stats_lock);
518 void TCPConvergenceLayer::addTrafficOut(
size_t amount)
throw ()
520 ibrcommon::MutexLock l(_stats_lock);
521 _stats_out += amount;
529 ibrcommon::socketset readfds;
532 _vsocket.select(&readfds, NULL, NULL, NULL);
534 for (ibrcommon::socketset::iterator iter = readfds.begin(); iter != readfds.end(); ++iter) {
537 ibrcommon::serversocket &sock =
dynamic_cast<ibrcommon::serversocket&
>(**iter);
540 ibrcommon::vaddress peeraddr;
541 ibrcommon::clientsocket *client = sock.accept(peeraddr);
544 dtn::data::EID source(
"tcp://" + peeraddr.address() +
":" + peeraddr.service());
550 const std::string uri =
"ip=" + peeraddr.address() +
";port=" + peeraddr.service() +
";";
558 if ( ibrcommon::TLSStream::isInitialized() )
569 }
catch (
const std::bad_cast&) {
575 ibrcommon::Thread::yield();
577 }
catch (
const std::exception&) {
588 void TCPConvergenceLayer::closeAll()
591 ibrcommon::MutexLock l(_connections_cond);
592 for (std::list<TCPConnection*>::iterator iter = _connections.begin(); iter != _connections.end(); ++iter)
610 _vsocket_state =
true;
611 }
catch (
const ibrcommon::socket_exception &ex) {
612 IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, error) <<
"bind failed (" << ex.what() <<
")" << IBRCOMMON_LOGGER_ENDL;
625 _vsocket_state =
false;
626 }
catch (
const ibrcommon::socket_exception &ex) {
627 IBRCOMMON_LOGGER_TAG(TCPConvergenceLayer::TAG, error) <<
"shutdown failed (" << ex.what() <<
")" << IBRCOMMON_LOGGER_ENDL;
635 ibrcommon::MutexLock l(_connections_cond);
636 while (_connections.size() > 0) _connections_cond.wait();
643 std::stringstream ss_format;
648 ss_format << _stats_in;
649 data[IN_TAG] = ss_format.str();
652 ss_format << _stats_out;
653 data[OUT_TAG] = ss_format.str();
656 void TCPConvergenceLayer::resetStats()
static Configuration & getInstance(bool reset=false)
static void raise(State, const dtn::core::Node &)
const Configuration::Discovery & getDiscovery() const
static void add(EventReceiver< E > *receiver)
friend class TCPConnection
dtn::core::Node::Protocol getDiscoveryProtocol() const
const dtn::core::Node & getNode() const
bool enableCrosslayer() const
void open(const dtn::core::Node &n)
bool match(const dtn::core::Node &n) const
static void remove(const EventReceiver< E > *receiver)
dtn::net::DiscoveryAgent & getDiscoveryAgent()
void unregisterService(const ibrcommon::vinterface &iface, const dtn::net::DiscoveryBeaconHandler *handler)
void queue(const dtn::net::BundleTransfer &job)
void onUpdateBeacon(const ibrcommon::vinterface &iface, DiscoveryBeacon &beacon)
void add(const ibrcommon::vinterface &net, int port)
std::map< string, string > stats_data
virtual ~TCPConvergenceLayer()
void queue(const dtn::core::Node &n, const dtn::net::BundleTransfer &job)
static std::string toString(const Node::Type type)
std::string toString() const
virtual void initialize()
void raiseEvent(const dtn::net::P2PDialupEvent &evt)
void registerService(const ibrcommon::vinterface &iface, dtn::net::DiscoveryBeaconHandler *handler)
void eventNotify(const ibrcommon::LinkEvent &evt)
static BundleCore & getInstance()
virtual const std::string getName() const