32 #include <ibrcommon/net/socket.h>
33 #include <ibrcommon/net/vaddress.h>
34 #include <ibrcommon/net/vinterface.h>
35 #include <ibrcommon/data/BLOB.h>
36 #include <ibrcommon/Logger.h>
37 #include <ibrcommon/thread/MutexLock.h>
39 #include <sys/types.h>
52 using namespace dtn::data;
58 const int UDPConvergenceLayer::DEFAULT_PORT = 4556;
60 UDPConvergenceLayer::UDPConvergenceLayer(ibrcommon::vinterface net,
int port,
dtn::data::Length mtu)
61 : _net(net), _port(port), m_maxmsgsize(mtu),
_running(false), _stats_in(0), _stats_out(0)
78 std::stringstream ss_format;
83 ss_format << _stats_in;
84 data[IN_TAG] = ss_format.str();
87 ss_format << _stats_out;
88 data[OUT_TAG] = ss_format.str();
100 std::stringstream service;
102 service <<
"port=" << _port <<
";";
103 announcement.addService(
DiscoveryService(getDiscoveryProtocol(), service.str()));
114 bool announced =
false;
118 if (!crosslayer)
throw ibrcommon::Exception(
"crosslayer discovery disabled!");
121 std::list<ibrcommon::vaddress> list = _net.getAddresses();
124 if (list.empty())
throw ibrcommon::Exception(
"no address found");
126 for (std::list<ibrcommon::vaddress>::const_iterator addr_it = list.begin(); addr_it != list.end(); ++addr_it)
128 const ibrcommon::vaddress &addr = (*addr_it);
131 if (addr.scope() != ibrcommon::vaddress::SCOPE_GLOBAL)
continue;
135 sa_family_t f = addr.family();
136 if ((f != AF_INET) && (f != AF_INET6))
continue;
138 std::stringstream service;
140 service <<
"ip=" << addr.address() <<
";port=" << _port <<
";";
141 announcement.addService(
DiscoveryService(getDiscoveryProtocol(), service.str()));
145 }
catch (
const ibrcommon::vaddress::address_exception &ex) {
146 IBRCOMMON_LOGGER_DEBUG_TAG(
"UDPConvergenceLayer", 25) << ex.what() << IBRCOMMON_LOGGER_ENDL;
149 }
catch (
const ibrcommon::Exception&) {
156 std::stringstream service;
157 service <<
"port=" << _port <<
";";
158 announcement.addService(
DiscoveryService(getDiscoveryProtocol(), service.str()));
165 if (uri_list.empty())
176 std::string address =
"0.0.0.0";
177 unsigned int port = 0;
180 uri.
decode(address, port);
183 ibrcommon::vaddress addr(address, port);
199 case BundleFilter::ACCEPT:
201 case BundleFilter::REJECT:
202 case BundleFilter::DROP:
220 if (size > m_maxmsgsize)
230 const size_t fragment_size = m_maxmsgsize - header;
231 const size_t fragment_count = (psize / fragment_size) + (((psize % fragment_size) > 0) ? 1 : 0);
233 IBRCOMMON_LOGGER_DEBUG_TAG(
"UDPConvergenceLayer", 30) <<
"MTU of " << m_maxmsgsize <<
" is too small to carry " << psize <<
" bytes of payload." << IBRCOMMON_LOGGER_ENDL;
234 IBRCOMMON_LOGGER_DEBUG_TAG(
"UDPConvergenceLayer", 30) <<
"create " << fragment_count <<
" fragments with " << fragment_size <<
" bytes each." << IBRCOMMON_LOGGER_ENDL;
236 for (
size_t i = 0; i < fragment_count; ++i)
240 std::stringstream ss;
243 serializer << fragment;
244 std::string data = ss.str();
252 std::stringstream ss;
255 serializer << bundle;
256 std::string data = ss.str();
269 }
catch (
const ibrcommon::socket_exception&) {
278 void UDPConvergenceLayer::send(
const ibrcommon::vaddress &addr,
const std::string &data)
throw (ibrcommon::socket_exception,
NoAddressFoundException)
281 ibrcommon::MutexLock l(m_writelock);
284 ibrcommon::socketset socks = _vsocket.getAll();
285 for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
286 ibrcommon::udpsocket &sock =
dynamic_cast<ibrcommon::udpsocket&
>(**iter);
289 sock.sendto(data.c_str(), data.length(), 0, addr);
292 _stats_out += data.length();
304 ibrcommon::MutexLock l(m_readlock);
306 std::vector<char> data(m_maxmsgsize);
309 ibrcommon::socketset readfds;
312 _vsocket.select(&readfds, NULL, NULL, NULL);
314 if (readfds.size() > 0) {
315 ibrcommon::datagramsocket *sock =
static_cast<ibrcommon::datagramsocket*
>(*readfds.begin());
317 ibrcommon::vaddress fromaddr;
318 size_t len = sock->recvfrom(&data[0], m_maxmsgsize, 0, fromaddr);
323 std::stringstream ss; ss <<
"udp://" << fromaddr.toString();
330 ss.write(&data[0], len);
340 if (evt.getInterface() != _net)
return;
342 switch (evt.getAction())
344 case ibrcommon::LinkEvent::ACTION_ADDRESS_ADDED:
346 ibrcommon::vaddress bindaddr = evt.getAddress();
348 std::stringstream ss; ss << _port;
349 bindaddr.setService(ss.str());
350 ibrcommon::udpsocket *sock =
new ibrcommon::udpsocket(bindaddr);
353 _vsocket.add(sock, evt.getInterface());
354 }
catch (
const ibrcommon::socket_exception&) {
360 case ibrcommon::LinkEvent::ACTION_ADDRESS_REMOVED:
362 ibrcommon::socketset socks = _vsocket.getAll();
363 for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
364 ibrcommon::udpsocket *sock =
dynamic_cast<ibrcommon::udpsocket*
>(*iter);
365 if (sock->get_address().address() == evt.getAddress().address()) {
366 _vsocket.remove(sock);
375 case ibrcommon::LinkEvent::ACTION_LINK_DOWN:
377 ibrcommon::socketset socks = _vsocket.get(evt.getInterface());
378 for (ibrcommon::socketset::iterator iter = socks.begin(); iter != socks.end(); ++iter) {
379 ibrcommon::udpsocket *sock =
dynamic_cast<ibrcommon::udpsocket*
>(*iter);
380 _vsocket.remove(sock);
397 std::list<ibrcommon::vaddress> addrs = _net.getAddresses();
400 std::stringstream ss; ss << _port;
402 for (std::list<ibrcommon::vaddress>::iterator iter = addrs.begin(); iter != addrs.end(); ++iter) {
403 ibrcommon::vaddress &addr = (*iter);
407 switch (addr.family()) {
410 addr.setService(ss.str());
411 _vsocket.add(
new ibrcommon::udpsocket(addr), _net);
416 }
catch (
const ibrcommon::vaddress::address_exception &ex) {
417 IBRCOMMON_LOGGER_TAG(
"UDPConvergenceLayer", warning) << ex.what() << IBRCOMMON_LOGGER_ENDL;
424 ibrcommon::LinkManager::getInstance().addEventListener(_net,
this);
428 }
catch (
const ibrcommon::socket_exception &ex) {
429 IBRCOMMON_LOGGER_TAG(
"UDPConvergenceLayer", error) <<
"bind failed (" << ex.what() <<
")" << IBRCOMMON_LOGGER_ENDL;
436 ibrcommon::LinkManager::getInstance().removeEventListener(
this);
460 receive(bundle, sender);
467 case BundleFilter::ACCEPT:
472 case BundleFilter::REJECT:
473 case BundleFilter::DROP:
478 IBRCOMMON_LOGGER_DEBUG_TAG(
"UDPConvergenceLayer", 2) <<
"Received a invalid bundle: " << ex.what() << IBRCOMMON_LOGGER_ENDL;
479 }
catch (
const std::exception&) {
494 return "UDPConvergenceLayer";
static Configuration & getInstance(bool reset=false)
void decode(std::string &address, unsigned int &port) const
void eventNotify(const ibrcommon::LinkEvent &evt)
const Configuration::Discovery & getDiscovery() const
dtn::core::Node::Protocol getDiscoveryProtocol() const
virtual Length getLength(const dtn::data::Bundle &obj)
void setBundle(const dtn::data::Bundle &data)
bool enableCrosslayer() const
void queue(const dtn::core::Node &n, const dtn::net::BundleTransfer &job)
dtn::net::DiscoveryAgent & getDiscoveryAgent()
BundleFilter::ACTION filter(BundleFilter::TABLE table, const FilterContext &context, dtn::data::Bundle &bundle) const
virtual ~UDPConvergenceLayer()
void onUpdateBeacon(const ibrcommon::vinterface &iface, DiscoveryBeacon &announcement)
void unregisterService(const ibrcommon::vinterface &iface, const dtn::net::DiscoveryBeaconHandler *handler)
void setProtocol(const dtn::core::Node::Protocol &protocol)
bool get(FLAGS flag) const
void setPeer(const dtn::data::EID &endpoint)
virtual const std::string getName() const
std::map< string, string > stats_data
virtual dtn::data::Bundle get(const dtn::data::BundleID &id)=0
virtual void componentDown()
virtual void componentUp()
std::list< URI > get(Node::Protocol proto) const
void abort(const TransferAbortedEvent::AbortReason reason)
const dtn::data::MetaBundle & getBundle() const
std::string toString() const
virtual void resetStats()
const dtn::data::EID & getEID() const
dtn::storage::BundleStorage & getStorage()
virtual void componentRun()
void registerService(const ibrcommon::vinterface &iface, dtn::net::DiscoveryBeaconHandler *handler)
static void raise(const dtn::data::EID &peer, const dtn::data::Bundle &bundle, const bool local=false)
static BundleCore & getInstance()
virtual void getStats(ConvergenceLayer::stats_data &data) const